##// END OF EJS Templates
wireproto: make version 2 @wireprotocommand an independent function...
Gregory Szorc -
r37798:8acd3a9a default
parent child Browse files
Show More
@@ -1,850 +1,809
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import tempfile
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18
19 19 from . import (
20 20 bundle2,
21 21 changegroup as changegroupmod,
22 22 discovery,
23 23 encoding,
24 24 error,
25 25 exchange,
26 26 pushkey as pushkeymod,
27 27 pycompat,
28 28 streamclone,
29 29 util,
30 30 wireprototypes,
31 31 )
32 32
33 33 from .utils import (
34 34 procutil,
35 35 stringutil,
36 36 )
37 37
38 38 urlerr = util.urlerr
39 39 urlreq = util.urlreq
40 40
41 41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
42 42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
43 43 'IncompatibleClient')
44 44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
45 45
46 46 def clientcompressionsupport(proto):
47 47 """Returns a list of compression methods supported by the client.
48 48
49 49 Returns a list of the compression methods supported by the client
50 50 according to the protocol capabilities. If no such capability has
51 51 been announced, fallback to the default of zlib and uncompressed.
52 52 """
53 53 for cap in proto.getprotocaps():
54 54 if cap.startswith('comp='):
55 55 return cap[5:].split(',')
56 56 return ['zlib', 'none']
57 57
58 58 # wire protocol command can either return a string or one of these classes.
59 59
60 60 def getdispatchrepo(repo, proto, command):
61 61 """Obtain the repo used for processing wire protocol commands.
62 62
63 63 The intent of this function is to serve as a monkeypatch point for
64 64 extensions that need commands to operate on different repo views under
65 65 specialized circumstances.
66 66 """
67 67 return repo.filtered('served')
68 68
69 69 def dispatch(repo, proto, command):
70 70 repo = getdispatchrepo(repo, proto, command)
71 71
72 72 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
73 73 commandtable = commandsv2 if transportversion == 2 else commands
74 74 func, spec = commandtable[command]
75 75
76 76 args = proto.getargs(spec)
77 77
78 78 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
79 79 if isinstance(args, list):
80 80 return func(repo, proto, *args)
81 81 elif isinstance(args, dict):
82 82 return func(repo, proto, **args)
83 83 else:
84 84 raise error.ProgrammingError('unexpected type returned from '
85 85 'proto.getargs(): %s' % type(args))
86 86
87 87 def options(cmd, keys, others):
88 88 opts = {}
89 89 for k in keys:
90 90 if k in others:
91 91 opts[k] = others[k]
92 92 del others[k]
93 93 if others:
94 94 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
95 95 % (cmd, ",".join(others)))
96 96 return opts
97 97
98 98 def bundle1allowed(repo, action):
99 99 """Whether a bundle1 operation is allowed from the server.
100 100
101 101 Priority is:
102 102
103 103 1. server.bundle1gd.<action> (if generaldelta active)
104 104 2. server.bundle1.<action>
105 105 3. server.bundle1gd (if generaldelta active)
106 106 4. server.bundle1
107 107 """
108 108 ui = repo.ui
109 109 gd = 'generaldelta' in repo.requirements
110 110
111 111 if gd:
112 112 v = ui.configbool('server', 'bundle1gd.%s' % action)
113 113 if v is not None:
114 114 return v
115 115
116 116 v = ui.configbool('server', 'bundle1.%s' % action)
117 117 if v is not None:
118 118 return v
119 119
120 120 if gd:
121 121 v = ui.configbool('server', 'bundle1gd')
122 122 if v is not None:
123 123 return v
124 124
125 125 return ui.configbool('server', 'bundle1')
126 126
127 127 def supportedcompengines(ui, role):
128 128 """Obtain the list of supported compression engines for a request."""
129 129 assert role in (util.CLIENTROLE, util.SERVERROLE)
130 130
131 131 compengines = util.compengines.supportedwireengines(role)
132 132
133 133 # Allow config to override default list and ordering.
134 134 if role == util.SERVERROLE:
135 135 configengines = ui.configlist('server', 'compressionengines')
136 136 config = 'server.compressionengines'
137 137 else:
138 138 # This is currently implemented mainly to facilitate testing. In most
139 139 # cases, the server should be in charge of choosing a compression engine
140 140 # because a server has the most to lose from a sub-optimal choice. (e.g.
141 141 # CPU DoS due to an expensive engine or a network DoS due to poor
142 142 # compression ratio).
143 143 configengines = ui.configlist('experimental',
144 144 'clientcompressionengines')
145 145 config = 'experimental.clientcompressionengines'
146 146
147 147 # No explicit config. Filter out the ones that aren't supposed to be
148 148 # advertised and return default ordering.
149 149 if not configengines:
150 150 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
151 151 return [e for e in compengines
152 152 if getattr(e.wireprotosupport(), attr) > 0]
153 153
154 154 # If compression engines are listed in the config, assume there is a good
155 155 # reason for it (like server operators wanting to achieve specific
156 156 # performance characteristics). So fail fast if the config references
157 157 # unusable compression engines.
158 158 validnames = set(e.name() for e in compengines)
159 159 invalidnames = set(e for e in configengines if e not in validnames)
160 160 if invalidnames:
161 161 raise error.Abort(_('invalid compression engine defined in %s: %s') %
162 162 (config, ', '.join(sorted(invalidnames))))
163 163
164 164 compengines = [e for e in compengines if e.name() in configengines]
165 165 compengines = sorted(compengines,
166 166 key=lambda e: configengines.index(e.name()))
167 167
168 168 if not compengines:
169 169 raise error.Abort(_('%s config option does not specify any known '
170 170 'compression engines') % config,
171 171 hint=_('usable compression engines: %s') %
172 172 ', '.sorted(validnames))
173 173
174 174 return compengines
175 175
176 176 class commandentry(object):
177 177 """Represents a declared wire protocol command."""
178 178 def __init__(self, func, args='', transports=None,
179 179 permission='push'):
180 180 self.func = func
181 181 self.args = args
182 182 self.transports = transports or set()
183 183 self.permission = permission
184 184
185 185 def _merge(self, func, args):
186 186 """Merge this instance with an incoming 2-tuple.
187 187
188 188 This is called when a caller using the old 2-tuple API attempts
189 189 to replace an instance. The incoming values are merged with
190 190 data not captured by the 2-tuple and a new instance containing
191 191 the union of the two objects is returned.
192 192 """
193 193 return commandentry(func, args=args, transports=set(self.transports),
194 194 permission=self.permission)
195 195
196 196 # Old code treats instances as 2-tuples. So expose that interface.
197 197 def __iter__(self):
198 198 yield self.func
199 199 yield self.args
200 200
201 201 def __getitem__(self, i):
202 202 if i == 0:
203 203 return self.func
204 204 elif i == 1:
205 205 return self.args
206 206 else:
207 207 raise IndexError('can only access elements 0 and 1')
208 208
209 209 class commanddict(dict):
210 210 """Container for registered wire protocol commands.
211 211
212 212 It behaves like a dict. But __setitem__ is overwritten to allow silent
213 213 coercion of values from 2-tuples for API compatibility.
214 214 """
215 215 def __setitem__(self, k, v):
216 216 if isinstance(v, commandentry):
217 217 pass
218 218 # Cast 2-tuples to commandentry instances.
219 219 elif isinstance(v, tuple):
220 220 if len(v) != 2:
221 221 raise ValueError('command tuples must have exactly 2 elements')
222 222
223 223 # It is common for extensions to wrap wire protocol commands via
224 224 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
225 225 # doing this aren't aware of the new API that uses objects to store
226 226 # command entries, we automatically merge old state with new.
227 227 if k in self:
228 228 v = self[k]._merge(v[0], v[1])
229 229 else:
230 230 # Use default values from @wireprotocommand.
231 231 v = commandentry(v[0], args=v[1],
232 232 transports=set(wireprototypes.TRANSPORTS),
233 233 permission='push')
234 234 else:
235 235 raise ValueError('command entries must be commandentry instances '
236 236 'or 2-tuples')
237 237
238 238 return super(commanddict, self).__setitem__(k, v)
239 239
240 240 def commandavailable(self, command, proto):
241 241 """Determine if a command is available for the requested protocol."""
242 242 assert proto.name in wireprototypes.TRANSPORTS
243 243
244 244 entry = self.get(command)
245 245
246 246 if not entry:
247 247 return False
248 248
249 249 if proto.name not in entry.transports:
250 250 return False
251 251
252 252 return True
253 253
254 # Constants specifying which transports a wire protocol command should be
255 # available on. For use with @wireprotocommand.
256 POLICY_V1_ONLY = 'v1-only'
257 POLICY_V2_ONLY = 'v2-only'
258
259 254 # For version 1 transports.
260 255 commands = commanddict()
261 256
262 257 # For version 2 transports.
263 258 commandsv2 = commanddict()
264 259
265 def wireprotocommand(name, args=None, transportpolicy=POLICY_V1_ONLY,
266 permission='push'):
260 def wireprotocommand(name, args=None, permission='push'):
267 261 """Decorator to declare a wire protocol command.
268 262
269 263 ``name`` is the name of the wire protocol command being provided.
270 264
271 265 ``args`` defines the named arguments accepted by the command. It is
272 ideally a dict mapping argument names to their types. For backwards
273 compatibility, it can be a space-delimited list of argument names. For
274 version 1 transports, ``*`` denotes a special value that says to accept
275 all named arguments.
276
277 ``transportpolicy`` is a POLICY_* constant denoting which transports
278 this wire protocol command should be exposed to. By default, commands
279 are exposed to all wire protocol transports.
266 a space-delimited list of argument names. ``*`` denotes a special value
267 that says to accept all named arguments.
280 268
281 269 ``permission`` defines the permission type needed to run this command.
282 270 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
283 271 respectively. Default is to assume command requires ``push`` permissions
284 272 because otherwise commands not declaring their permissions could modify
285 273 a repository that is supposed to be read-only.
286 274 """
287 if transportpolicy == POLICY_V1_ONLY:
288 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
289 if v['version'] == 1}
290 transportversion = 1
291 elif transportpolicy == POLICY_V2_ONLY:
292 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
293 if v['version'] == 2}
294 transportversion = 2
295 else:
296 raise error.ProgrammingError('invalid transport policy value: %s' %
297 transportpolicy)
275 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
276 if v['version'] == 1}
298 277
299 278 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
300 279 # SSHv2.
301 280 # TODO undo this hack when SSH is using the unified frame protocol.
302 281 if name == b'batch':
303 282 transports.add(wireprototypes.SSHV2)
304 283
305 284 if permission not in ('push', 'pull'):
306 285 raise error.ProgrammingError('invalid wire protocol permission; '
307 286 'got %s; expected "push" or "pull"' %
308 287 permission)
309 288
310 if transportversion == 1:
311 if args is None:
312 args = ''
289 if args is None:
290 args = ''
313 291
314 if not isinstance(args, bytes):
315 raise error.ProgrammingError('arguments for version 1 commands '
316 'must be declared as bytes')
317 elif transportversion == 2:
318 if args is None:
319 args = {}
320
321 if not isinstance(args, dict):
322 raise error.ProgrammingError('arguments for version 2 commands '
323 'must be declared as dicts')
292 if not isinstance(args, bytes):
293 raise error.ProgrammingError('arguments for version 1 commands '
294 'must be declared as bytes')
324 295
325 296 def register(func):
326 if transportversion == 1:
327 if name in commands:
328 raise error.ProgrammingError('%s command already registered '
329 'for version 1' % name)
330 commands[name] = commandentry(func, args=args,
331 transports=transports,
332 permission=permission)
333 elif transportversion == 2:
334 if name in commandsv2:
335 raise error.ProgrammingError('%s command already registered '
336 'for version 2' % name)
337
338 commandsv2[name] = commandentry(func, args=args,
339 transports=transports,
340 permission=permission)
341 else:
342 raise error.ProgrammingError('unhandled transport version: %d' %
343 transportversion)
297 if name in commands:
298 raise error.ProgrammingError('%s command already registered '
299 'for version 1' % name)
300 commands[name] = commandentry(func, args=args,
301 transports=transports,
302 permission=permission)
344 303
345 304 return func
346 305 return register
347 306
348 307 # TODO define a more appropriate permissions type to use for this.
349 308 @wireprotocommand('batch', 'cmds *', permission='pull')
350 309 def batch(repo, proto, cmds, others):
351 310 unescapearg = wireprototypes.unescapebatcharg
352 311 repo = repo.filtered("served")
353 312 res = []
354 313 for pair in cmds.split(';'):
355 314 op, args = pair.split(' ', 1)
356 315 vals = {}
357 316 for a in args.split(','):
358 317 if a:
359 318 n, v = a.split('=')
360 319 vals[unescapearg(n)] = unescapearg(v)
361 320 func, spec = commands[op]
362 321
363 322 # Validate that client has permissions to perform this command.
364 323 perm = commands[op].permission
365 324 assert perm in ('push', 'pull')
366 325 proto.checkperm(perm)
367 326
368 327 if spec:
369 328 keys = spec.split()
370 329 data = {}
371 330 for k in keys:
372 331 if k == '*':
373 332 star = {}
374 333 for key in vals.keys():
375 334 if key not in keys:
376 335 star[key] = vals[key]
377 336 data['*'] = star
378 337 else:
379 338 data[k] = vals[k]
380 339 result = func(repo, proto, *[data[k] for k in keys])
381 340 else:
382 341 result = func(repo, proto)
383 342 if isinstance(result, wireprototypes.ooberror):
384 343 return result
385 344
386 345 # For now, all batchable commands must return bytesresponse or
387 346 # raw bytes (for backwards compatibility).
388 347 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
389 348 if isinstance(result, wireprototypes.bytesresponse):
390 349 result = result.data
391 350 res.append(wireprototypes.escapebatcharg(result))
392 351
393 352 return wireprototypes.bytesresponse(';'.join(res))
394 353
395 354 @wireprotocommand('between', 'pairs', permission='pull')
396 355 def between(repo, proto, pairs):
397 356 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
398 357 r = []
399 358 for b in repo.between(pairs):
400 359 r.append(wireprototypes.encodelist(b) + "\n")
401 360
402 361 return wireprototypes.bytesresponse(''.join(r))
403 362
404 363 @wireprotocommand('branchmap', permission='pull')
405 364 def branchmap(repo, proto):
406 365 branchmap = repo.branchmap()
407 366 heads = []
408 367 for branch, nodes in branchmap.iteritems():
409 368 branchname = urlreq.quote(encoding.fromlocal(branch))
410 369 branchnodes = wireprototypes.encodelist(nodes)
411 370 heads.append('%s %s' % (branchname, branchnodes))
412 371
413 372 return wireprototypes.bytesresponse('\n'.join(heads))
414 373
415 374 @wireprotocommand('branches', 'nodes', permission='pull')
416 375 def branches(repo, proto, nodes):
417 376 nodes = wireprototypes.decodelist(nodes)
418 377 r = []
419 378 for b in repo.branches(nodes):
420 379 r.append(wireprototypes.encodelist(b) + "\n")
421 380
422 381 return wireprototypes.bytesresponse(''.join(r))
423 382
424 383 @wireprotocommand('clonebundles', '', permission='pull')
425 384 def clonebundles(repo, proto):
426 385 """Server command for returning info for available bundles to seed clones.
427 386
428 387 Clients will parse this response and determine what bundle to fetch.
429 388
430 389 Extensions may wrap this command to filter or dynamically emit data
431 390 depending on the request. e.g. you could advertise URLs for the closest
432 391 data center given the client's IP address.
433 392 """
434 393 return wireprototypes.bytesresponse(
435 394 repo.vfs.tryread('clonebundles.manifest'))
436 395
437 396 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
438 397 'known', 'getbundle', 'unbundlehash']
439 398
440 399 def _capabilities(repo, proto):
441 400 """return a list of capabilities for a repo
442 401
443 402 This function exists to allow extensions to easily wrap capabilities
444 403 computation
445 404
446 405 - returns a lists: easy to alter
447 406 - change done here will be propagated to both `capabilities` and `hello`
448 407 command without any other action needed.
449 408 """
450 409 # copy to prevent modification of the global list
451 410 caps = list(wireprotocaps)
452 411
453 412 # Command of same name as capability isn't exposed to version 1 of
454 413 # transports. So conditionally add it.
455 414 if commands.commandavailable('changegroupsubset', proto):
456 415 caps.append('changegroupsubset')
457 416
458 417 if streamclone.allowservergeneration(repo):
459 418 if repo.ui.configbool('server', 'preferuncompressed'):
460 419 caps.append('stream-preferred')
461 420 requiredformats = repo.requirements & repo.supportedformats
462 421 # if our local revlogs are just revlogv1, add 'stream' cap
463 422 if not requiredformats - {'revlogv1'}:
464 423 caps.append('stream')
465 424 # otherwise, add 'streamreqs' detailing our local revlog format
466 425 else:
467 426 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
468 427 if repo.ui.configbool('experimental', 'bundle2-advertise'):
469 428 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
470 429 caps.append('bundle2=' + urlreq.quote(capsblob))
471 430 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
472 431
473 432 return proto.addcapabilities(repo, caps)
474 433
475 434 # If you are writing an extension and consider wrapping this function. Wrap
476 435 # `_capabilities` instead.
477 436 @wireprotocommand('capabilities', permission='pull')
478 437 def capabilities(repo, proto):
479 438 caps = _capabilities(repo, proto)
480 439 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
481 440
482 441 @wireprotocommand('changegroup', 'roots', permission='pull')
483 442 def changegroup(repo, proto, roots):
484 443 nodes = wireprototypes.decodelist(roots)
485 444 outgoing = discovery.outgoing(repo, missingroots=nodes,
486 445 missingheads=repo.heads())
487 446 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
488 447 gen = iter(lambda: cg.read(32768), '')
489 448 return wireprototypes.streamres(gen=gen)
490 449
491 450 @wireprotocommand('changegroupsubset', 'bases heads',
492 451 permission='pull')
493 452 def changegroupsubset(repo, proto, bases, heads):
494 453 bases = wireprototypes.decodelist(bases)
495 454 heads = wireprototypes.decodelist(heads)
496 455 outgoing = discovery.outgoing(repo, missingroots=bases,
497 456 missingheads=heads)
498 457 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
499 458 gen = iter(lambda: cg.read(32768), '')
500 459 return wireprototypes.streamres(gen=gen)
501 460
502 461 @wireprotocommand('debugwireargs', 'one two *',
503 462 permission='pull')
504 463 def debugwireargs(repo, proto, one, two, others):
505 464 # only accept optional args from the known set
506 465 opts = options('debugwireargs', ['three', 'four'], others)
507 466 return wireprototypes.bytesresponse(repo.debugwireargs(
508 467 one, two, **pycompat.strkwargs(opts)))
509 468
510 469 def find_pullbundle(repo, proto, opts, clheads, heads, common):
511 470 """Return a file object for the first matching pullbundle.
512 471
513 472 Pullbundles are specified in .hg/pullbundles.manifest similar to
514 473 clonebundles.
515 474 For each entry, the bundle specification is checked for compatibility:
516 475 - Client features vs the BUNDLESPEC.
517 476 - Revisions shared with the clients vs base revisions of the bundle.
518 477 A bundle can be applied only if all its base revisions are known by
519 478 the client.
520 479 - At least one leaf of the bundle's DAG is missing on the client.
521 480 - Every leaf of the bundle's DAG is part of node set the client wants.
522 481 E.g. do not send a bundle of all changes if the client wants only
523 482 one specific branch of many.
524 483 """
525 484 def decodehexstring(s):
526 485 return set([h.decode('hex') for h in s.split(';')])
527 486
528 487 manifest = repo.vfs.tryread('pullbundles.manifest')
529 488 if not manifest:
530 489 return None
531 490 res = exchange.parseclonebundlesmanifest(repo, manifest)
532 491 res = exchange.filterclonebundleentries(repo, res)
533 492 if not res:
534 493 return None
535 494 cl = repo.changelog
536 495 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
537 496 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
538 497 compformats = clientcompressionsupport(proto)
539 498 for entry in res:
540 499 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
541 500 continue
542 501 # No test yet for VERSION, since V2 is supported by any client
543 502 # that advertises partial pulls
544 503 if 'heads' in entry:
545 504 try:
546 505 bundle_heads = decodehexstring(entry['heads'])
547 506 except TypeError:
548 507 # Bad heads entry
549 508 continue
550 509 if bundle_heads.issubset(common):
551 510 continue # Nothing new
552 511 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
553 512 continue # Still nothing new
554 513 if any(cl.rev(rev) not in heads_anc and
555 514 cl.rev(rev) not in common_anc for rev in bundle_heads):
556 515 continue
557 516 if 'bases' in entry:
558 517 try:
559 518 bundle_bases = decodehexstring(entry['bases'])
560 519 except TypeError:
561 520 # Bad bases entry
562 521 continue
563 522 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
564 523 continue
565 524 path = entry['URL']
566 525 repo.ui.debug('sending pullbundle "%s"\n' % path)
567 526 try:
568 527 return repo.vfs.open(path)
569 528 except IOError:
570 529 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
571 530 continue
572 531 return None
573 532
574 533 @wireprotocommand('getbundle', '*', permission='pull')
575 534 def getbundle(repo, proto, others):
576 535 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
577 536 others)
578 537 for k, v in opts.iteritems():
579 538 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
580 539 if keytype == 'nodes':
581 540 opts[k] = wireprototypes.decodelist(v)
582 541 elif keytype == 'csv':
583 542 opts[k] = list(v.split(','))
584 543 elif keytype == 'scsv':
585 544 opts[k] = set(v.split(','))
586 545 elif keytype == 'boolean':
587 546 # Client should serialize False as '0', which is a non-empty string
588 547 # so it evaluates as a True bool.
589 548 if v == '0':
590 549 opts[k] = False
591 550 else:
592 551 opts[k] = bool(v)
593 552 elif keytype != 'plain':
594 553 raise KeyError('unknown getbundle option type %s'
595 554 % keytype)
596 555
597 556 if not bundle1allowed(repo, 'pull'):
598 557 if not exchange.bundle2requested(opts.get('bundlecaps')):
599 558 if proto.name == 'http-v1':
600 559 return wireprototypes.ooberror(bundle2required)
601 560 raise error.Abort(bundle2requiredmain,
602 561 hint=bundle2requiredhint)
603 562
604 563 prefercompressed = True
605 564
606 565 try:
607 566 clheads = set(repo.changelog.heads())
608 567 heads = set(opts.get('heads', set()))
609 568 common = set(opts.get('common', set()))
610 569 common.discard(nullid)
611 570 if (repo.ui.configbool('server', 'pullbundle') and
612 571 'partial-pull' in proto.getprotocaps()):
613 572 # Check if a pre-built bundle covers this request.
614 573 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
615 574 if bundle:
616 575 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
617 576 prefer_uncompressed=True)
618 577
619 578 if repo.ui.configbool('server', 'disablefullbundle'):
620 579 # Check to see if this is a full clone.
621 580 changegroup = opts.get('cg', True)
622 581 if changegroup and not common and clheads == heads:
623 582 raise error.Abort(
624 583 _('server has pull-based clones disabled'),
625 584 hint=_('remove --pull if specified or upgrade Mercurial'))
626 585
627 586 info, chunks = exchange.getbundlechunks(repo, 'serve',
628 587 **pycompat.strkwargs(opts))
629 588 prefercompressed = info.get('prefercompressed', True)
630 589 except error.Abort as exc:
631 590 # cleanly forward Abort error to the client
632 591 if not exchange.bundle2requested(opts.get('bundlecaps')):
633 592 if proto.name == 'http-v1':
634 593 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
635 594 raise # cannot do better for bundle1 + ssh
636 595 # bundle2 request expect a bundle2 reply
637 596 bundler = bundle2.bundle20(repo.ui)
638 597 manargs = [('message', pycompat.bytestr(exc))]
639 598 advargs = []
640 599 if exc.hint is not None:
641 600 advargs.append(('hint', exc.hint))
642 601 bundler.addpart(bundle2.bundlepart('error:abort',
643 602 manargs, advargs))
644 603 chunks = bundler.getchunks()
645 604 prefercompressed = False
646 605
647 606 return wireprototypes.streamres(
648 607 gen=chunks, prefer_uncompressed=not prefercompressed)
649 608
650 609 @wireprotocommand('heads', permission='pull')
651 610 def heads(repo, proto):
652 611 h = repo.heads()
653 612 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
654 613
655 614 @wireprotocommand('hello', permission='pull')
656 615 def hello(repo, proto):
657 616 """Called as part of SSH handshake to obtain server info.
658 617
659 618 Returns a list of lines describing interesting things about the
660 619 server, in an RFC822-like format.
661 620
662 621 Currently, the only one defined is ``capabilities``, which consists of a
663 622 line of space separated tokens describing server abilities:
664 623
665 624 capabilities: <token0> <token1> <token2>
666 625 """
667 626 caps = capabilities(repo, proto).data
668 627 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
669 628
670 629 @wireprotocommand('listkeys', 'namespace', permission='pull')
671 630 def listkeys(repo, proto, namespace):
672 631 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
673 632 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
674 633
675 634 @wireprotocommand('lookup', 'key', permission='pull')
676 635 def lookup(repo, proto, key):
677 636 try:
678 637 k = encoding.tolocal(key)
679 638 n = repo.lookup(k)
680 639 r = hex(n)
681 640 success = 1
682 641 except Exception as inst:
683 642 r = stringutil.forcebytestr(inst)
684 643 success = 0
685 644 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
686 645
687 646 @wireprotocommand('known', 'nodes *', permission='pull')
688 647 def known(repo, proto, nodes, others):
689 648 v = ''.join(b and '1' or '0'
690 649 for b in repo.known(wireprototypes.decodelist(nodes)))
691 650 return wireprototypes.bytesresponse(v)
692 651
693 652 @wireprotocommand('protocaps', 'caps', permission='pull')
694 653 def protocaps(repo, proto, caps):
695 654 if proto.name == wireprototypes.SSHV1:
696 655 proto._protocaps = set(caps.split(' '))
697 656 return wireprototypes.bytesresponse('OK')
698 657
699 658 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
700 659 def pushkey(repo, proto, namespace, key, old, new):
701 660 # compatibility with pre-1.8 clients which were accidentally
702 661 # sending raw binary nodes rather than utf-8-encoded hex
703 662 if len(new) == 20 and stringutil.escapestr(new) != new:
704 663 # looks like it could be a binary node
705 664 try:
706 665 new.decode('utf-8')
707 666 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
708 667 except UnicodeDecodeError:
709 668 pass # binary, leave unmodified
710 669 else:
711 670 new = encoding.tolocal(new) # normal path
712 671
713 672 with proto.mayberedirectstdio() as output:
714 673 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
715 674 encoding.tolocal(old), new) or False
716 675
717 676 output = output.getvalue() if output else ''
718 677 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
719 678
720 679 @wireprotocommand('stream_out', permission='pull')
721 680 def stream(repo, proto):
722 681 '''If the server supports streaming clone, it advertises the "stream"
723 682 capability with a value representing the version and flags of the repo
724 683 it is serving. Client checks to see if it understands the format.
725 684 '''
726 685 return wireprototypes.streamreslegacy(
727 686 streamclone.generatev1wireproto(repo))
728 687
729 688 @wireprotocommand('unbundle', 'heads', permission='push')
730 689 def unbundle(repo, proto, heads):
731 690 their_heads = wireprototypes.decodelist(heads)
732 691
733 692 with proto.mayberedirectstdio() as output:
734 693 try:
735 694 exchange.check_heads(repo, their_heads, 'preparing changes')
736 695 cleanup = lambda: None
737 696 try:
738 697 payload = proto.getpayload()
739 698 if repo.ui.configbool('server', 'streamunbundle'):
740 699 def cleanup():
741 700 # Ensure that the full payload is consumed, so
742 701 # that the connection doesn't contain trailing garbage.
743 702 for p in payload:
744 703 pass
745 704 fp = util.chunkbuffer(payload)
746 705 else:
747 706 # write bundle data to temporary file as it can be big
748 707 fp, tempname = None, None
749 708 def cleanup():
750 709 if fp:
751 710 fp.close()
752 711 if tempname:
753 712 os.unlink(tempname)
754 713 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
755 714 repo.ui.debug('redirecting incoming bundle to %s\n' %
756 715 tempname)
757 716 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
758 717 r = 0
759 718 for p in payload:
760 719 fp.write(p)
761 720 fp.seek(0)
762 721
763 722 gen = exchange.readbundle(repo.ui, fp, None)
764 723 if (isinstance(gen, changegroupmod.cg1unpacker)
765 724 and not bundle1allowed(repo, 'push')):
766 725 if proto.name == 'http-v1':
767 726 # need to special case http because stderr do not get to
768 727 # the http client on failed push so we need to abuse
769 728 # some other error type to make sure the message get to
770 729 # the user.
771 730 return wireprototypes.ooberror(bundle2required)
772 731 raise error.Abort(bundle2requiredmain,
773 732 hint=bundle2requiredhint)
774 733
775 734 r = exchange.unbundle(repo, gen, their_heads, 'serve',
776 735 proto.client())
777 736 if util.safehasattr(r, 'addpart'):
778 737 # The return looks streamable, we are in the bundle2 case
779 738 # and should return a stream.
780 739 return wireprototypes.streamreslegacy(gen=r.getchunks())
781 740 return wireprototypes.pushres(
782 741 r, output.getvalue() if output else '')
783 742
784 743 finally:
785 744 cleanup()
786 745
787 746 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
788 747 # handle non-bundle2 case first
789 748 if not getattr(exc, 'duringunbundle2', False):
790 749 try:
791 750 raise
792 751 except error.Abort:
793 752 # The old code we moved used procutil.stderr directly.
794 753 # We did not change it to minimise code change.
795 754 # This need to be moved to something proper.
796 755 # Feel free to do it.
797 756 procutil.stderr.write("abort: %s\n" % exc)
798 757 if exc.hint is not None:
799 758 procutil.stderr.write("(%s)\n" % exc.hint)
800 759 procutil.stderr.flush()
801 760 return wireprototypes.pushres(
802 761 0, output.getvalue() if output else '')
803 762 except error.PushRaced:
804 763 return wireprototypes.pusherr(
805 764 pycompat.bytestr(exc),
806 765 output.getvalue() if output else '')
807 766
808 767 bundler = bundle2.bundle20(repo.ui)
809 768 for out in getattr(exc, '_bundle2salvagedoutput', ()):
810 769 bundler.addpart(out)
811 770 try:
812 771 try:
813 772 raise
814 773 except error.PushkeyFailed as exc:
815 774 # check client caps
816 775 remotecaps = getattr(exc, '_replycaps', None)
817 776 if (remotecaps is not None
818 777 and 'pushkey' not in remotecaps.get('error', ())):
819 778 # no support remote side, fallback to Abort handler.
820 779 raise
821 780 part = bundler.newpart('error:pushkey')
822 781 part.addparam('in-reply-to', exc.partid)
823 782 if exc.namespace is not None:
824 783 part.addparam('namespace', exc.namespace,
825 784 mandatory=False)
826 785 if exc.key is not None:
827 786 part.addparam('key', exc.key, mandatory=False)
828 787 if exc.new is not None:
829 788 part.addparam('new', exc.new, mandatory=False)
830 789 if exc.old is not None:
831 790 part.addparam('old', exc.old, mandatory=False)
832 791 if exc.ret is not None:
833 792 part.addparam('ret', exc.ret, mandatory=False)
834 793 except error.BundleValueError as exc:
835 794 errpart = bundler.newpart('error:unsupportedcontent')
836 795 if exc.parttype is not None:
837 796 errpart.addparam('parttype', exc.parttype)
838 797 if exc.params:
839 798 errpart.addparam('params', '\0'.join(exc.params))
840 799 except error.Abort as exc:
841 800 manargs = [('message', stringutil.forcebytestr(exc))]
842 801 advargs = []
843 802 if exc.hint is not None:
844 803 advargs.append(('hint', exc.hint))
845 804 bundler.addpart(bundle2.bundlepart('error:abort',
846 805 manargs, advargs))
847 806 except error.PushRaced as exc:
848 807 bundler.newpart('error:pushraced',
849 808 [('message', stringutil.forcebytestr(exc))])
850 809 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,489 +1,522
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10
11 11 from .i18n import _
12 12 from .thirdparty import (
13 13 cbor,
14 14 )
15 15 from .thirdparty.zope import (
16 16 interface as zi,
17 17 )
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 pycompat,
22 22 streamclone,
23 23 util,
24 24 wireproto,
25 25 wireprotoframing,
26 26 wireprototypes,
27 27 )
28 28
29 29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30 30
31 31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32 32
33 33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 34 from .hgweb import common as hgwebcommon
35 35
36 36 # URL space looks like: <permissions>/<command>, where <permission> can
37 37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38 38
39 39 # Root URL does nothing meaningful... yet.
40 40 if not urlparts:
41 41 res.status = b'200 OK'
42 42 res.headers[b'Content-Type'] = b'text/plain'
43 43 res.setbodybytes(_('HTTP version 2 API handler'))
44 44 return
45 45
46 46 if len(urlparts) == 1:
47 47 res.status = b'404 Not Found'
48 48 res.headers[b'Content-Type'] = b'text/plain'
49 49 res.setbodybytes(_('do not know how to process %s\n') %
50 50 req.dispatchpath)
51 51 return
52 52
53 53 permission, command = urlparts[0:2]
54 54
55 55 if permission not in (b'ro', b'rw'):
56 56 res.status = b'404 Not Found'
57 57 res.headers[b'Content-Type'] = b'text/plain'
58 58 res.setbodybytes(_('unknown permission: %s') % permission)
59 59 return
60 60
61 61 if req.method != 'POST':
62 62 res.status = b'405 Method Not Allowed'
63 63 res.headers[b'Allow'] = b'POST'
64 64 res.setbodybytes(_('commands require POST requests'))
65 65 return
66 66
67 67 # At some point we'll want to use our own API instead of recycling the
68 68 # behavior of version 1 of the wire protocol...
69 69 # TODO return reasonable responses - not responses that overload the
70 70 # HTTP status line message for error reporting.
71 71 try:
72 72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 73 except hgwebcommon.ErrorResponse as e:
74 74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 75 for k, v in e.headers:
76 76 res.headers[k] = v
77 77 res.setbodybytes('permission denied')
78 78 return
79 79
80 80 # We have a special endpoint to reflect the request back at the client.
81 81 if command == b'debugreflect':
82 82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 83 return
84 84
85 85 # Extra commands that we handle that aren't really wire protocol
86 86 # commands. Think extra hard before making this hackery available to
87 87 # extension.
88 88 extracommands = {'multirequest'}
89 89
90 90 if command not in wireproto.commandsv2 and command not in extracommands:
91 91 res.status = b'404 Not Found'
92 92 res.headers[b'Content-Type'] = b'text/plain'
93 93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 94 return
95 95
96 96 repo = rctx.repo
97 97 ui = repo.ui
98 98
99 99 proto = httpv2protocolhandler(req, ui)
100 100
101 101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 102 and command not in extracommands):
103 103 res.status = b'404 Not Found'
104 104 res.headers[b'Content-Type'] = b'text/plain'
105 105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 106 return
107 107
108 108 # TODO consider cases where proxies may add additional Accept headers.
109 109 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 110 res.status = b'406 Not Acceptable'
111 111 res.headers[b'Content-Type'] = b'text/plain'
112 112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 113 % FRAMINGTYPE)
114 114 return
115 115
116 116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 117 res.status = b'415 Unsupported Media Type'
118 118 # TODO we should send a response with appropriate media type,
119 119 # since client does Accept it.
120 120 res.headers[b'Content-Type'] = b'text/plain'
121 121 res.setbodybytes(_('client MUST send Content-Type header with '
122 122 'value: %s\n') % FRAMINGTYPE)
123 123 return
124 124
125 125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126 126
127 127 def _processhttpv2reflectrequest(ui, repo, req, res):
128 128 """Reads unified frame protocol request and dumps out state to client.
129 129
130 130 This special endpoint can be used to help debug the wire protocol.
131 131
132 132 Instead of routing the request through the normal dispatch mechanism,
133 133 we instead read all frames, decode them, and feed them into our state
134 134 tracker. We then dump the log of all that activity back out to the
135 135 client.
136 136 """
137 137 import json
138 138
139 139 # Reflection APIs have a history of being abused, accidentally disclosing
140 140 # sensitive data, etc. So we have a config knob.
141 141 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 142 res.status = b'404 Not Found'
143 143 res.headers[b'Content-Type'] = b'text/plain'
144 144 res.setbodybytes(_('debugreflect service not available'))
145 145 return
146 146
147 147 # We assume we have a unified framing protocol request body.
148 148
149 149 reactor = wireprotoframing.serverreactor()
150 150 states = []
151 151
152 152 while True:
153 153 frame = wireprotoframing.readframe(req.bodyfh)
154 154
155 155 if not frame:
156 156 states.append(b'received: <no frame>')
157 157 break
158 158
159 159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 160 frame.requestid,
161 161 frame.payload))
162 162
163 163 action, meta = reactor.onframerecv(frame)
164 164 states.append(json.dumps((action, meta), sort_keys=True,
165 165 separators=(', ', ': ')))
166 166
167 167 action, meta = reactor.oninputeof()
168 168 meta['action'] = action
169 169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170 170
171 171 res.status = b'200 OK'
172 172 res.headers[b'Content-Type'] = b'text/plain'
173 173 res.setbodybytes(b'\n'.join(states))
174 174
175 175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 176 """Post-validation handler for HTTPv2 requests.
177 177
178 178 Called when the HTTP request contains unified frame-based protocol
179 179 frames for evaluation.
180 180 """
181 181 # TODO Some HTTP clients are full duplex and can receive data before
182 182 # the entire request is transmitted. Figure out a way to indicate support
183 183 # for that so we can opt into full duplex mode.
184 184 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 185 seencommand = False
186 186
187 187 outstream = reactor.makeoutputstream()
188 188
189 189 while True:
190 190 frame = wireprotoframing.readframe(req.bodyfh)
191 191 if not frame:
192 192 break
193 193
194 194 action, meta = reactor.onframerecv(frame)
195 195
196 196 if action == 'wantframe':
197 197 # Need more data before we can do anything.
198 198 continue
199 199 elif action == 'runcommand':
200 200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 201 reqcommand, reactor, outstream,
202 202 meta, issubsequent=seencommand)
203 203
204 204 if sentoutput:
205 205 return
206 206
207 207 seencommand = True
208 208
209 209 elif action == 'error':
210 210 # TODO define proper error mechanism.
211 211 res.status = b'200 OK'
212 212 res.headers[b'Content-Type'] = b'text/plain'
213 213 res.setbodybytes(meta['message'] + b'\n')
214 214 return
215 215 else:
216 216 raise error.ProgrammingError(
217 217 'unhandled action from frame processor: %s' % action)
218 218
219 219 action, meta = reactor.oninputeof()
220 220 if action == 'sendframes':
221 221 # We assume we haven't started sending the response yet. If we're
222 222 # wrong, the response type will raise an exception.
223 223 res.status = b'200 OK'
224 224 res.headers[b'Content-Type'] = FRAMINGTYPE
225 225 res.setbodygen(meta['framegen'])
226 226 elif action == 'noop':
227 227 pass
228 228 else:
229 229 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 230 % action)
231 231
232 232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 233 outstream, command, issubsequent):
234 234 """Dispatch a wire protocol command made from HTTPv2 requests.
235 235
236 236 The authenticated permission (``authedperm``) along with the original
237 237 command from the URL (``reqcommand``) are passed in.
238 238 """
239 239 # We already validated that the session has permissions to perform the
240 240 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 241 # command to run is expressed in a frame. However, the URL also requested
242 242 # to run a specific command. We need to be careful that the command we
243 243 # run doesn't have permissions requirements greater than what was granted
244 244 # by ``authedperm``.
245 245 #
246 246 # Our rule for this is we only allow one command per HTTP request and
247 247 # that command must match the command in the URL. However, we make
248 248 # an exception for the ``multirequest`` URL. This URL is allowed to
249 249 # execute multiple commands. We double check permissions of each command
250 250 # as it is invoked to ensure there is no privilege escalation.
251 251 # TODO consider allowing multiple commands to regular command URLs
252 252 # iff each command is the same.
253 253
254 254 proto = httpv2protocolhandler(req, ui, args=command['args'])
255 255
256 256 if reqcommand == b'multirequest':
257 257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 258 # TODO proper error mechanism
259 259 res.status = b'200 OK'
260 260 res.headers[b'Content-Type'] = b'text/plain'
261 261 res.setbodybytes(_('wire protocol command not available: %s') %
262 262 command['command'])
263 263 return True
264 264
265 265 # TODO don't use assert here, since it may be elided by -O.
266 266 assert authedperm in (b'ro', b'rw')
267 267 wirecommand = wireproto.commandsv2[command['command']]
268 268 assert wirecommand.permission in ('push', 'pull')
269 269
270 270 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 271 # TODO proper error mechanism
272 272 res.status = b'403 Forbidden'
273 273 res.headers[b'Content-Type'] = b'text/plain'
274 274 res.setbodybytes(_('insufficient permissions to execute '
275 275 'command: %s') % command['command'])
276 276 return True
277 277
278 278 # TODO should we also call checkperm() here? Maybe not if we're going
279 279 # to overhaul that API. The granted scope from the URL check should
280 280 # be good enough.
281 281
282 282 else:
283 283 # Don't allow multiple commands outside of ``multirequest`` URL.
284 284 if issubsequent:
285 285 # TODO proper error mechanism
286 286 res.status = b'200 OK'
287 287 res.headers[b'Content-Type'] = b'text/plain'
288 288 res.setbodybytes(_('multiple commands cannot be issued to this '
289 289 'URL'))
290 290 return True
291 291
292 292 if reqcommand != command['command']:
293 293 # TODO define proper error mechanism
294 294 res.status = b'200 OK'
295 295 res.headers[b'Content-Type'] = b'text/plain'
296 296 res.setbodybytes(_('command in frame must match command in URL'))
297 297 return True
298 298
299 299 rsp = wireproto.dispatch(repo, proto, command['command'])
300 300
301 301 res.status = b'200 OK'
302 302 res.headers[b'Content-Type'] = FRAMINGTYPE
303 303
304 304 if isinstance(rsp, wireprototypes.cborresponse):
305 305 encoded = cbor.dumps(rsp.value, canonical=True)
306 306 action, meta = reactor.oncommandresponseready(outstream,
307 307 command['requestid'],
308 308 encoded)
309 309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 311 command['requestid'],
312 312 rsp.gen)
313 313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 314 action, meta = reactor.oncommanderror(outstream,
315 315 command['requestid'],
316 316 rsp.message,
317 317 rsp.args)
318 318 else:
319 319 action, meta = reactor.onservererror(
320 320 _('unhandled response type from wire proto command'))
321 321
322 322 if action == 'sendframes':
323 323 res.setbodygen(meta['framegen'])
324 324 return True
325 325 elif action == 'noop':
326 326 return False
327 327 else:
328 328 raise error.ProgrammingError('unhandled event from reactor: %s' %
329 329 action)
330 330
331 331 @zi.implementer(wireprototypes.baseprotocolhandler)
332 332 class httpv2protocolhandler(object):
333 333 def __init__(self, req, ui, args=None):
334 334 self._req = req
335 335 self._ui = ui
336 336 self._args = args
337 337
338 338 @property
339 339 def name(self):
340 340 return HTTP_WIREPROTO_V2
341 341
342 342 def getargs(self, args):
343 343 data = {}
344 344 for k, typ in args.items():
345 345 if k == '*':
346 346 raise NotImplementedError('do not support * args')
347 347 elif k in self._args:
348 348 # TODO consider validating value types.
349 349 data[k] = self._args[k]
350 350
351 351 return data
352 352
353 353 def getprotocaps(self):
354 354 # Protocol capabilities are currently not implemented for HTTP V2.
355 355 return set()
356 356
357 357 def getpayload(self):
358 358 raise NotImplementedError
359 359
360 360 @contextlib.contextmanager
361 361 def mayberedirectstdio(self):
362 362 raise NotImplementedError
363 363
364 364 def client(self):
365 365 raise NotImplementedError
366 366
367 367 def addcapabilities(self, repo, caps):
368 368 return caps
369 369
370 370 def checkperm(self, perm):
371 371 raise NotImplementedError
372 372
373 373 def httpv2apidescriptor(req, repo):
374 374 proto = httpv2protocolhandler(req, repo.ui)
375 375
376 376 return _capabilitiesv2(repo, proto)
377 377
378 378 def _capabilitiesv2(repo, proto):
379 379 """Obtain the set of capabilities for version 2 transports.
380 380
381 381 These capabilities are distinct from the capabilities for version 1
382 382 transports.
383 383 """
384 384 compression = []
385 385 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
386 386 compression.append({
387 387 b'name': engine.wireprotosupport().name,
388 388 })
389 389
390 390 caps = {
391 391 'commands': {},
392 392 'compression': compression,
393 393 'framingmediatypes': [FRAMINGTYPE],
394 394 }
395 395
396 396 for command, entry in wireproto.commandsv2.items():
397 397 caps['commands'][command] = {
398 398 'args': entry.args,
399 399 'permissions': [entry.permission],
400 400 }
401 401
402 402 if streamclone.allowservergeneration(repo):
403 403 caps['rawrepoformats'] = sorted(repo.requirements &
404 404 repo.supportedformats)
405 405
406 406 return proto.addcapabilities(repo, caps)
407 407
408 def wireprotocommand(*args, **kwargs):
408 def wireprotocommand(name, args=None, permission='push'):
409 """Decorator to declare a wire protocol command.
410
411 ``name`` is the name of the wire protocol command being provided.
412
413 ``args`` is a dict of argument names to example values.
414
415 ``permission`` defines the permission type needed to run this command.
416 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
417 respectively. Default is to assume command requires ``push`` permissions
418 because otherwise commands not declaring their permissions could modify
419 a repository that is supposed to be read-only.
420 """
421 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
422 if v['version'] == 2}
423
424 if permission not in ('push', 'pull'):
425 raise error.ProgrammingError('invalid wire protocol permission; '
426 'got %s; expected "push" or "pull"' %
427 permission)
428
429 if args is None:
430 args = {}
431
432 if not isinstance(args, dict):
433 raise error.ProgrammingError('arguments for version 2 commands '
434 'must be declared as dicts')
435
409 436 def register(func):
410 return wireproto.wireprotocommand(
411 *args, transportpolicy=wireproto.POLICY_V2_ONLY, **kwargs)(func)
437 if name in wireproto.commandsv2:
438 raise error.ProgrammingError('%s command already registered '
439 'for version 2' % name)
440
441 wireproto.commandsv2[name] = wireproto.commandentry(
442 func, args=args, transports=transports, permission=permission)
443
444 return func
412 445
413 446 return register
414 447
415 448 @wireprotocommand('branchmap', permission='pull')
416 449 def branchmapv2(repo, proto):
417 450 branchmap = {encoding.fromlocal(k): v
418 451 for k, v in repo.branchmap().iteritems()}
419 452
420 453 return wireprototypes.cborresponse(branchmap)
421 454
422 455 @wireprotocommand('capabilities', permission='pull')
423 456 def capabilitiesv2(repo, proto):
424 457 caps = _capabilitiesv2(repo, proto)
425 458
426 459 return wireprototypes.cborresponse(caps)
427 460
428 461 @wireprotocommand('heads',
429 462 args={
430 463 'publiconly': False,
431 464 },
432 465 permission='pull')
433 466 def headsv2(repo, proto, publiconly=False):
434 467 if publiconly:
435 468 repo = repo.filtered('immutable')
436 469
437 470 return wireprototypes.cborresponse(repo.heads())
438 471
439 472 @wireprotocommand('known',
440 473 args={
441 474 'nodes': [b'deadbeef'],
442 475 },
443 476 permission='pull')
444 477 def knownv2(repo, proto, nodes=None):
445 478 nodes = nodes or []
446 479 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
447 480 return wireprototypes.cborresponse(result)
448 481
449 482 @wireprotocommand('listkeys',
450 483 args={
451 484 'namespace': b'ns',
452 485 },
453 486 permission='pull')
454 487 def listkeysv2(repo, proto, namespace=None):
455 488 keys = repo.listkeys(encoding.tolocal(namespace))
456 489 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
457 490 for k, v in keys.iteritems()}
458 491
459 492 return wireprototypes.cborresponse(keys)
460 493
461 494 @wireprotocommand('lookup',
462 495 args={
463 496 'key': b'foo',
464 497 },
465 498 permission='pull')
466 499 def lookupv2(repo, proto, key):
467 500 key = encoding.tolocal(key)
468 501
469 502 # TODO handle exception.
470 503 node = repo.lookup(key)
471 504
472 505 return wireprototypes.cborresponse(node)
473 506
474 507 @wireprotocommand('pushkey',
475 508 args={
476 509 'namespace': b'ns',
477 510 'key': b'key',
478 511 'old': b'old',
479 512 'new': b'new',
480 513 },
481 514 permission='push')
482 515 def pushkeyv2(repo, proto, namespace, key, old, new):
483 516 # TODO handle ui output redirection
484 517 r = repo.pushkey(encoding.tolocal(namespace),
485 518 encoding.tolocal(key),
486 519 encoding.tolocal(old),
487 520 encoding.tolocal(new))
488 521
489 522 return wireprototypes.cborresponse(r)
@@ -1,59 +1,58
1 1 HTTPV2=exp-http-v2-0001
2 2 MEDIATYPE=application/mercurial-exp-framing-0005
3 3
4 4 sendhttpraw() {
5 5 hg --verbose debugwireproto --peer raw http://$LOCALIP:$HGPORT/
6 6 }
7 7
8 8 sendhttpv2peer() {
9 9 hg --verbose debugwireproto --nologhandshake --peer http2 http://$LOCALIP:$HGPORT/
10 10 }
11 11
12 12 sendhttpv2peerhandshake() {
13 13 hg --verbose debugwireproto --peer http2 http://$LOCALIP:$HGPORT/
14 14 }
15 15
16 16 cat > dummycommands.py << EOF
17 17 from mercurial import (
18 18 wireprototypes,
19 wireprotov2server,
19 20 wireproto,
20 21 )
21 22
22 23 @wireproto.wireprotocommand('customreadonly', permission='pull')
23 24 def customreadonlyv1(repo, proto):
24 25 return wireprototypes.bytesresponse(b'customreadonly bytes response')
25 26
26 @wireproto.wireprotocommand('customreadonly', permission='pull',
27 transportpolicy=wireproto.POLICY_V2_ONLY)
27 @wireprotov2server.wireprotocommand('customreadonly', permission='pull')
28 28 def customreadonlyv2(repo, proto):
29 29 return wireprototypes.cborresponse(b'customreadonly bytes response')
30 30
31 31 @wireproto.wireprotocommand('customreadwrite', permission='push')
32 32 def customreadwrite(repo, proto):
33 33 return wireprototypes.bytesresponse(b'customreadwrite bytes response')
34 34
35 @wireproto.wireprotocommand('customreadwrite', permission='push',
36 transportpolicy=wireproto.POLICY_V2_ONLY)
35 @wireprotov2server.wireprotocommand('customreadwrite', permission='push')
37 36 def customreadwritev2(repo, proto):
38 37 return wireprototypes.cborresponse(b'customreadwrite bytes response')
39 38 EOF
40 39
41 40 cat >> $HGRCPATH << EOF
42 41 [extensions]
43 42 drawdag = $TESTDIR/drawdag.py
44 43 EOF
45 44
46 45 enabledummycommands() {
47 46 cat >> $HGRCPATH << EOF
48 47 [extensions]
49 48 dummycommands = $TESTTMP/dummycommands.py
50 49 EOF
51 50 }
52 51
53 52 enablehttpv2() {
54 53 cat >> $1/.hg/hgrc << EOF
55 54 [experimental]
56 55 web.apiserver = true
57 56 web.api.http-v2 = true
58 57 EOF
59 58 }
General Comments 0
You need to be logged in to leave comments. Login now