##// END OF EJS Templates
wireprotov2: extract file object emission to own function...
Gregory Szorc -
r40213:41e2633b default
parent child Browse files
Show More
@@ -1,1227 +1,1231
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import hashlib
11 11
12 12 from .i18n import _
13 13 from .node import (
14 14 hex,
15 15 nullid,
16 16 )
17 17 from . import (
18 18 discovery,
19 19 encoding,
20 20 error,
21 21 narrowspec,
22 22 pycompat,
23 23 wireprotoframing,
24 24 wireprototypes,
25 25 )
26 26 from .utils import (
27 27 cborutil,
28 28 interfaceutil,
29 29 stringutil,
30 30 )
31 31
32 32 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
33 33
34 34 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
35 35
36 36 COMMANDS = wireprototypes.commanddict()
37 37
38 38 # Value inserted into cache key computation function. Change the value to
39 39 # force new cache keys for every command request. This should be done when
40 40 # there is a change to how caching works, etc.
41 41 GLOBAL_CACHE_VERSION = 1
42 42
43 43 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
44 44 from .hgweb import common as hgwebcommon
45 45
46 46 # URL space looks like: <permissions>/<command>, where <permission> can
47 47 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
48 48
49 49 # Root URL does nothing meaningful... yet.
50 50 if not urlparts:
51 51 res.status = b'200 OK'
52 52 res.headers[b'Content-Type'] = b'text/plain'
53 53 res.setbodybytes(_('HTTP version 2 API handler'))
54 54 return
55 55
56 56 if len(urlparts) == 1:
57 57 res.status = b'404 Not Found'
58 58 res.headers[b'Content-Type'] = b'text/plain'
59 59 res.setbodybytes(_('do not know how to process %s\n') %
60 60 req.dispatchpath)
61 61 return
62 62
63 63 permission, command = urlparts[0:2]
64 64
65 65 if permission not in (b'ro', b'rw'):
66 66 res.status = b'404 Not Found'
67 67 res.headers[b'Content-Type'] = b'text/plain'
68 68 res.setbodybytes(_('unknown permission: %s') % permission)
69 69 return
70 70
71 71 if req.method != 'POST':
72 72 res.status = b'405 Method Not Allowed'
73 73 res.headers[b'Allow'] = b'POST'
74 74 res.setbodybytes(_('commands require POST requests'))
75 75 return
76 76
77 77 # At some point we'll want to use our own API instead of recycling the
78 78 # behavior of version 1 of the wire protocol...
79 79 # TODO return reasonable responses - not responses that overload the
80 80 # HTTP status line message for error reporting.
81 81 try:
82 82 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
83 83 except hgwebcommon.ErrorResponse as e:
84 84 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
85 85 for k, v in e.headers:
86 86 res.headers[k] = v
87 87 res.setbodybytes('permission denied')
88 88 return
89 89
90 90 # We have a special endpoint to reflect the request back at the client.
91 91 if command == b'debugreflect':
92 92 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
93 93 return
94 94
95 95 # Extra commands that we handle that aren't really wire protocol
96 96 # commands. Think extra hard before making this hackery available to
97 97 # extension.
98 98 extracommands = {'multirequest'}
99 99
100 100 if command not in COMMANDS and command not in extracommands:
101 101 res.status = b'404 Not Found'
102 102 res.headers[b'Content-Type'] = b'text/plain'
103 103 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
104 104 return
105 105
106 106 repo = rctx.repo
107 107 ui = repo.ui
108 108
109 109 proto = httpv2protocolhandler(req, ui)
110 110
111 111 if (not COMMANDS.commandavailable(command, proto)
112 112 and command not in extracommands):
113 113 res.status = b'404 Not Found'
114 114 res.headers[b'Content-Type'] = b'text/plain'
115 115 res.setbodybytes(_('invalid wire protocol command: %s') % command)
116 116 return
117 117
118 118 # TODO consider cases where proxies may add additional Accept headers.
119 119 if req.headers.get(b'Accept') != FRAMINGTYPE:
120 120 res.status = b'406 Not Acceptable'
121 121 res.headers[b'Content-Type'] = b'text/plain'
122 122 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
123 123 % FRAMINGTYPE)
124 124 return
125 125
126 126 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
127 127 res.status = b'415 Unsupported Media Type'
128 128 # TODO we should send a response with appropriate media type,
129 129 # since client does Accept it.
130 130 res.headers[b'Content-Type'] = b'text/plain'
131 131 res.setbodybytes(_('client MUST send Content-Type header with '
132 132 'value: %s\n') % FRAMINGTYPE)
133 133 return
134 134
135 135 _processhttpv2request(ui, repo, req, res, permission, command, proto)
136 136
137 137 def _processhttpv2reflectrequest(ui, repo, req, res):
138 138 """Reads unified frame protocol request and dumps out state to client.
139 139
140 140 This special endpoint can be used to help debug the wire protocol.
141 141
142 142 Instead of routing the request through the normal dispatch mechanism,
143 143 we instead read all frames, decode them, and feed them into our state
144 144 tracker. We then dump the log of all that activity back out to the
145 145 client.
146 146 """
147 147 import json
148 148
149 149 # Reflection APIs have a history of being abused, accidentally disclosing
150 150 # sensitive data, etc. So we have a config knob.
151 151 if not ui.configbool('experimental', 'web.api.debugreflect'):
152 152 res.status = b'404 Not Found'
153 153 res.headers[b'Content-Type'] = b'text/plain'
154 154 res.setbodybytes(_('debugreflect service not available'))
155 155 return
156 156
157 157 # We assume we have a unified framing protocol request body.
158 158
159 159 reactor = wireprotoframing.serverreactor(ui)
160 160 states = []
161 161
162 162 while True:
163 163 frame = wireprotoframing.readframe(req.bodyfh)
164 164
165 165 if not frame:
166 166 states.append(b'received: <no frame>')
167 167 break
168 168
169 169 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
170 170 frame.requestid,
171 171 frame.payload))
172 172
173 173 action, meta = reactor.onframerecv(frame)
174 174 states.append(json.dumps((action, meta), sort_keys=True,
175 175 separators=(', ', ': ')))
176 176
177 177 action, meta = reactor.oninputeof()
178 178 meta['action'] = action
179 179 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
180 180
181 181 res.status = b'200 OK'
182 182 res.headers[b'Content-Type'] = b'text/plain'
183 183 res.setbodybytes(b'\n'.join(states))
184 184
185 185 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
186 186 """Post-validation handler for HTTPv2 requests.
187 187
188 188 Called when the HTTP request contains unified frame-based protocol
189 189 frames for evaluation.
190 190 """
191 191 # TODO Some HTTP clients are full duplex and can receive data before
192 192 # the entire request is transmitted. Figure out a way to indicate support
193 193 # for that so we can opt into full duplex mode.
194 194 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
195 195 seencommand = False
196 196
197 197 outstream = None
198 198
199 199 while True:
200 200 frame = wireprotoframing.readframe(req.bodyfh)
201 201 if not frame:
202 202 break
203 203
204 204 action, meta = reactor.onframerecv(frame)
205 205
206 206 if action == 'wantframe':
207 207 # Need more data before we can do anything.
208 208 continue
209 209 elif action == 'runcommand':
210 210 # Defer creating output stream because we need to wait for
211 211 # protocol settings frames so proper encoding can be applied.
212 212 if not outstream:
213 213 outstream = reactor.makeoutputstream()
214 214
215 215 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
216 216 reqcommand, reactor, outstream,
217 217 meta, issubsequent=seencommand)
218 218
219 219 if sentoutput:
220 220 return
221 221
222 222 seencommand = True
223 223
224 224 elif action == 'error':
225 225 # TODO define proper error mechanism.
226 226 res.status = b'200 OK'
227 227 res.headers[b'Content-Type'] = b'text/plain'
228 228 res.setbodybytes(meta['message'] + b'\n')
229 229 return
230 230 else:
231 231 raise error.ProgrammingError(
232 232 'unhandled action from frame processor: %s' % action)
233 233
234 234 action, meta = reactor.oninputeof()
235 235 if action == 'sendframes':
236 236 # We assume we haven't started sending the response yet. If we're
237 237 # wrong, the response type will raise an exception.
238 238 res.status = b'200 OK'
239 239 res.headers[b'Content-Type'] = FRAMINGTYPE
240 240 res.setbodygen(meta['framegen'])
241 241 elif action == 'noop':
242 242 pass
243 243 else:
244 244 raise error.ProgrammingError('unhandled action from frame processor: %s'
245 245 % action)
246 246
247 247 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
248 248 outstream, command, issubsequent):
249 249 """Dispatch a wire protocol command made from HTTPv2 requests.
250 250
251 251 The authenticated permission (``authedperm``) along with the original
252 252 command from the URL (``reqcommand``) are passed in.
253 253 """
254 254 # We already validated that the session has permissions to perform the
255 255 # actions in ``authedperm``. In the unified frame protocol, the canonical
256 256 # command to run is expressed in a frame. However, the URL also requested
257 257 # to run a specific command. We need to be careful that the command we
258 258 # run doesn't have permissions requirements greater than what was granted
259 259 # by ``authedperm``.
260 260 #
261 261 # Our rule for this is we only allow one command per HTTP request and
262 262 # that command must match the command in the URL. However, we make
263 263 # an exception for the ``multirequest`` URL. This URL is allowed to
264 264 # execute multiple commands. We double check permissions of each command
265 265 # as it is invoked to ensure there is no privilege escalation.
266 266 # TODO consider allowing multiple commands to regular command URLs
267 267 # iff each command is the same.
268 268
269 269 proto = httpv2protocolhandler(req, ui, args=command['args'])
270 270
271 271 if reqcommand == b'multirequest':
272 272 if not COMMANDS.commandavailable(command['command'], proto):
273 273 # TODO proper error mechanism
274 274 res.status = b'200 OK'
275 275 res.headers[b'Content-Type'] = b'text/plain'
276 276 res.setbodybytes(_('wire protocol command not available: %s') %
277 277 command['command'])
278 278 return True
279 279
280 280 # TODO don't use assert here, since it may be elided by -O.
281 281 assert authedperm in (b'ro', b'rw')
282 282 wirecommand = COMMANDS[command['command']]
283 283 assert wirecommand.permission in ('push', 'pull')
284 284
285 285 if authedperm == b'ro' and wirecommand.permission != 'pull':
286 286 # TODO proper error mechanism
287 287 res.status = b'403 Forbidden'
288 288 res.headers[b'Content-Type'] = b'text/plain'
289 289 res.setbodybytes(_('insufficient permissions to execute '
290 290 'command: %s') % command['command'])
291 291 return True
292 292
293 293 # TODO should we also call checkperm() here? Maybe not if we're going
294 294 # to overhaul that API. The granted scope from the URL check should
295 295 # be good enough.
296 296
297 297 else:
298 298 # Don't allow multiple commands outside of ``multirequest`` URL.
299 299 if issubsequent:
300 300 # TODO proper error mechanism
301 301 res.status = b'200 OK'
302 302 res.headers[b'Content-Type'] = b'text/plain'
303 303 res.setbodybytes(_('multiple commands cannot be issued to this '
304 304 'URL'))
305 305 return True
306 306
307 307 if reqcommand != command['command']:
308 308 # TODO define proper error mechanism
309 309 res.status = b'200 OK'
310 310 res.headers[b'Content-Type'] = b'text/plain'
311 311 res.setbodybytes(_('command in frame must match command in URL'))
312 312 return True
313 313
314 314 res.status = b'200 OK'
315 315 res.headers[b'Content-Type'] = FRAMINGTYPE
316 316
317 317 try:
318 318 objs = dispatch(repo, proto, command['command'], command['redirect'])
319 319
320 320 action, meta = reactor.oncommandresponsereadyobjects(
321 321 outstream, command['requestid'], objs)
322 322
323 323 except error.WireprotoCommandError as e:
324 324 action, meta = reactor.oncommanderror(
325 325 outstream, command['requestid'], e.message, e.messageargs)
326 326
327 327 except Exception as e:
328 328 action, meta = reactor.onservererror(
329 329 outstream, command['requestid'],
330 330 _('exception when invoking command: %s') %
331 331 stringutil.forcebytestr(e))
332 332
333 333 if action == 'sendframes':
334 334 res.setbodygen(meta['framegen'])
335 335 return True
336 336 elif action == 'noop':
337 337 return False
338 338 else:
339 339 raise error.ProgrammingError('unhandled event from reactor: %s' %
340 340 action)
341 341
342 342 def getdispatchrepo(repo, proto, command):
343 343 return repo.filtered('served')
344 344
345 345 def dispatch(repo, proto, command, redirect):
346 346 """Run a wire protocol command.
347 347
348 348 Returns an iterable of objects that will be sent to the client.
349 349 """
350 350 repo = getdispatchrepo(repo, proto, command)
351 351
352 352 entry = COMMANDS[command]
353 353 func = entry.func
354 354 spec = entry.args
355 355
356 356 args = proto.getargs(spec)
357 357
358 358 # There is some duplicate boilerplate code here for calling the command and
359 359 # emitting objects. It is either that or a lot of indented code that looks
360 360 # like a pyramid (since there are a lot of code paths that result in not
361 361 # using the cacher).
362 362 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
363 363
364 364 # Request is not cacheable. Don't bother instantiating a cacher.
365 365 if not entry.cachekeyfn:
366 366 for o in callcommand():
367 367 yield o
368 368 return
369 369
370 370 if redirect:
371 371 redirecttargets = redirect[b'targets']
372 372 redirecthashes = redirect[b'hashes']
373 373 else:
374 374 redirecttargets = []
375 375 redirecthashes = []
376 376
377 377 cacher = makeresponsecacher(repo, proto, command, args,
378 378 cborutil.streamencode,
379 379 redirecttargets=redirecttargets,
380 380 redirecthashes=redirecthashes)
381 381
382 382 # But we have no cacher. Do default handling.
383 383 if not cacher:
384 384 for o in callcommand():
385 385 yield o
386 386 return
387 387
388 388 with cacher:
389 389 cachekey = entry.cachekeyfn(repo, proto, cacher, **args)
390 390
391 391 # No cache key or the cacher doesn't like it. Do default handling.
392 392 if cachekey is None or not cacher.setcachekey(cachekey):
393 393 for o in callcommand():
394 394 yield o
395 395 return
396 396
397 397 # Serve it from the cache, if possible.
398 398 cached = cacher.lookup()
399 399
400 400 if cached:
401 401 for o in cached['objs']:
402 402 yield o
403 403 return
404 404
405 405 # Else call the command and feed its output into the cacher, allowing
406 406 # the cacher to buffer/mutate objects as it desires.
407 407 for o in callcommand():
408 408 for o in cacher.onobject(o):
409 409 yield o
410 410
411 411 for o in cacher.onfinished():
412 412 yield o
413 413
414 414 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
415 415 class httpv2protocolhandler(object):
416 416 def __init__(self, req, ui, args=None):
417 417 self._req = req
418 418 self._ui = ui
419 419 self._args = args
420 420
421 421 @property
422 422 def name(self):
423 423 return HTTP_WIREPROTO_V2
424 424
425 425 def getargs(self, args):
426 426 # First look for args that were passed but aren't registered on this
427 427 # command.
428 428 extra = set(self._args) - set(args)
429 429 if extra:
430 430 raise error.WireprotoCommandError(
431 431 'unsupported argument to command: %s' %
432 432 ', '.join(sorted(extra)))
433 433
434 434 # And look for required arguments that are missing.
435 435 missing = {a for a in args if args[a]['required']} - set(self._args)
436 436
437 437 if missing:
438 438 raise error.WireprotoCommandError(
439 439 'missing required arguments: %s' % ', '.join(sorted(missing)))
440 440
441 441 # Now derive the arguments to pass to the command, taking into
442 442 # account the arguments specified by the client.
443 443 data = {}
444 444 for k, meta in sorted(args.items()):
445 445 # This argument wasn't passed by the client.
446 446 if k not in self._args:
447 447 data[k] = meta['default']()
448 448 continue
449 449
450 450 v = self._args[k]
451 451
452 452 # Sets may be expressed as lists. Silently normalize.
453 453 if meta['type'] == 'set' and isinstance(v, list):
454 454 v = set(v)
455 455
456 456 # TODO consider more/stronger type validation.
457 457
458 458 data[k] = v
459 459
460 460 return data
461 461
462 462 def getprotocaps(self):
463 463 # Protocol capabilities are currently not implemented for HTTP V2.
464 464 return set()
465 465
466 466 def getpayload(self):
467 467 raise NotImplementedError
468 468
469 469 @contextlib.contextmanager
470 470 def mayberedirectstdio(self):
471 471 raise NotImplementedError
472 472
473 473 def client(self):
474 474 raise NotImplementedError
475 475
476 476 def addcapabilities(self, repo, caps):
477 477 return caps
478 478
479 479 def checkperm(self, perm):
480 480 raise NotImplementedError
481 481
482 482 def httpv2apidescriptor(req, repo):
483 483 proto = httpv2protocolhandler(req, repo.ui)
484 484
485 485 return _capabilitiesv2(repo, proto)
486 486
487 487 def _capabilitiesv2(repo, proto):
488 488 """Obtain the set of capabilities for version 2 transports.
489 489
490 490 These capabilities are distinct from the capabilities for version 1
491 491 transports.
492 492 """
493 493 caps = {
494 494 'commands': {},
495 495 'framingmediatypes': [FRAMINGTYPE],
496 496 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
497 497 }
498 498
499 499 for command, entry in COMMANDS.items():
500 500 args = {}
501 501
502 502 for arg, meta in entry.args.items():
503 503 args[arg] = {
504 504 # TODO should this be a normalized type using CBOR's
505 505 # terminology?
506 506 b'type': meta['type'],
507 507 b'required': meta['required'],
508 508 }
509 509
510 510 if not meta['required']:
511 511 args[arg][b'default'] = meta['default']()
512 512
513 513 if meta['validvalues']:
514 514 args[arg][b'validvalues'] = meta['validvalues']
515 515
516 516 caps['commands'][command] = {
517 517 'args': args,
518 518 'permissions': [entry.permission],
519 519 }
520 520
521 521 if entry.extracapabilitiesfn:
522 522 extracaps = entry.extracapabilitiesfn(repo, proto)
523 523 caps['commands'][command].update(extracaps)
524 524
525 525 caps['rawrepoformats'] = sorted(repo.requirements &
526 526 repo.supportedformats)
527 527
528 528 targets = getadvertisedredirecttargets(repo, proto)
529 529 if targets:
530 530 caps[b'redirect'] = {
531 531 b'targets': [],
532 532 b'hashes': [b'sha256', b'sha1'],
533 533 }
534 534
535 535 for target in targets:
536 536 entry = {
537 537 b'name': target['name'],
538 538 b'protocol': target['protocol'],
539 539 b'uris': target['uris'],
540 540 }
541 541
542 542 for key in ('snirequired', 'tlsversions'):
543 543 if key in target:
544 544 entry[key] = target[key]
545 545
546 546 caps[b'redirect'][b'targets'].append(entry)
547 547
548 548 return proto.addcapabilities(repo, caps)
549 549
550 550 def getadvertisedredirecttargets(repo, proto):
551 551 """Obtain a list of content redirect targets.
552 552
553 553 Returns a list containing potential redirect targets that will be
554 554 advertised in capabilities data. Each dict MUST have the following
555 555 keys:
556 556
557 557 name
558 558 The name of this redirect target. This is the identifier clients use
559 559 to refer to a target. It is transferred as part of every command
560 560 request.
561 561
562 562 protocol
563 563 Network protocol used by this target. Typically this is the string
564 564 in front of the ``://`` in a URL. e.g. ``https``.
565 565
566 566 uris
567 567 List of representative URIs for this target. Clients can use the
568 568 URIs to test parsing for compatibility or for ordering preference
569 569 for which target to use.
570 570
571 571 The following optional keys are recognized:
572 572
573 573 snirequired
574 574 Bool indicating if Server Name Indication (SNI) is required to
575 575 connect to this target.
576 576
577 577 tlsversions
578 578 List of bytes indicating which TLS versions are supported by this
579 579 target.
580 580
581 581 By default, clients reflect the target order advertised by servers
582 582 and servers will use the first client-advertised target when picking
583 583 a redirect target. So targets should be advertised in the order the
584 584 server prefers they be used.
585 585 """
586 586 return []
587 587
588 588 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None,
589 589 extracapabilitiesfn=None):
590 590 """Decorator to declare a wire protocol command.
591 591
592 592 ``name`` is the name of the wire protocol command being provided.
593 593
594 594 ``args`` is a dict defining arguments accepted by the command. Keys are
595 595 the argument name. Values are dicts with the following keys:
596 596
597 597 ``type``
598 598 The argument data type. Must be one of the following string
599 599 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
600 600 or ``bool``.
601 601
602 602 ``default``
603 603 A callable returning the default value for this argument. If not
604 604 specified, ``None`` will be the default value.
605 605
606 606 ``example``
607 607 An example value for this argument.
608 608
609 609 ``validvalues``
610 610 Set of recognized values for this argument.
611 611
612 612 ``permission`` defines the permission type needed to run this command.
613 613 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
614 614 respectively. Default is to assume command requires ``push`` permissions
615 615 because otherwise commands not declaring their permissions could modify
616 616 a repository that is supposed to be read-only.
617 617
618 618 ``cachekeyfn`` defines an optional callable that can derive the
619 619 cache key for this request.
620 620
621 621 ``extracapabilitiesfn`` defines an optional callable that defines extra
622 622 command capabilities/parameters that are advertised next to the command
623 623 in the capabilities data structure describing the server. The callable
624 624 receives as arguments the repository and protocol objects. It returns
625 625 a dict of extra fields to add to the command descriptor.
626 626
627 627 Wire protocol commands are generators of objects to be serialized and
628 628 sent to the client.
629 629
630 630 If a command raises an uncaught exception, this will be translated into
631 631 a command error.
632 632
633 633 All commands can opt in to being cacheable by defining a function
634 634 (``cachekeyfn``) that is called to derive a cache key. This function
635 635 receives the same arguments as the command itself plus a ``cacher``
636 636 argument containing the active cacher for the request and returns a bytes
637 637 containing the key in a cache the response to this command may be cached
638 638 under.
639 639 """
640 640 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
641 641 if v['version'] == 2}
642 642
643 643 if permission not in ('push', 'pull'):
644 644 raise error.ProgrammingError('invalid wire protocol permission; '
645 645 'got %s; expected "push" or "pull"' %
646 646 permission)
647 647
648 648 if args is None:
649 649 args = {}
650 650
651 651 if not isinstance(args, dict):
652 652 raise error.ProgrammingError('arguments for version 2 commands '
653 653 'must be declared as dicts')
654 654
655 655 for arg, meta in args.items():
656 656 if arg == '*':
657 657 raise error.ProgrammingError('* argument name not allowed on '
658 658 'version 2 commands')
659 659
660 660 if not isinstance(meta, dict):
661 661 raise error.ProgrammingError('arguments for version 2 commands '
662 662 'must declare metadata as a dict')
663 663
664 664 if 'type' not in meta:
665 665 raise error.ProgrammingError('%s argument for command %s does not '
666 666 'declare type field' % (arg, name))
667 667
668 668 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
669 669 raise error.ProgrammingError('%s argument for command %s has '
670 670 'illegal type: %s' % (arg, name,
671 671 meta['type']))
672 672
673 673 if 'example' not in meta:
674 674 raise error.ProgrammingError('%s argument for command %s does not '
675 675 'declare example field' % (arg, name))
676 676
677 677 meta['required'] = 'default' not in meta
678 678
679 679 meta.setdefault('default', lambda: None)
680 680 meta.setdefault('validvalues', None)
681 681
682 682 def register(func):
683 683 if name in COMMANDS:
684 684 raise error.ProgrammingError('%s command already registered '
685 685 'for version 2' % name)
686 686
687 687 COMMANDS[name] = wireprototypes.commandentry(
688 688 func, args=args, transports=transports, permission=permission,
689 689 cachekeyfn=cachekeyfn, extracapabilitiesfn=extracapabilitiesfn)
690 690
691 691 return func
692 692
693 693 return register
694 694
695 695 def makecommandcachekeyfn(command, localversion=None, allargs=False):
696 696 """Construct a cache key derivation function with common features.
697 697
698 698 By default, the cache key is a hash of:
699 699
700 700 * The command name.
701 701 * A global cache version number.
702 702 * A local cache version number (passed via ``localversion``).
703 703 * All the arguments passed to the command.
704 704 * The media type used.
705 705 * Wire protocol version string.
706 706 * The repository path.
707 707 """
708 708 if not allargs:
709 709 raise error.ProgrammingError('only allargs=True is currently supported')
710 710
711 711 if localversion is None:
712 712 raise error.ProgrammingError('must set localversion argument value')
713 713
714 714 def cachekeyfn(repo, proto, cacher, **args):
715 715 spec = COMMANDS[command]
716 716
717 717 # Commands that mutate the repo can not be cached.
718 718 if spec.permission == 'push':
719 719 return None
720 720
721 721 # TODO config option to disable caching.
722 722
723 723 # Our key derivation strategy is to construct a data structure
724 724 # holding everything that could influence cacheability and to hash
725 725 # the CBOR representation of that. Using CBOR seems like it might
726 726 # be overkill. However, simpler hashing mechanisms are prone to
727 727 # duplicate input issues. e.g. if you just concatenate two values,
728 728 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
729 729 # "padding" between values and prevents these problems.
730 730
731 731 # Seed the hash with various data.
732 732 state = {
733 733 # To invalidate all cache keys.
734 734 b'globalversion': GLOBAL_CACHE_VERSION,
735 735 # More granular cache key invalidation.
736 736 b'localversion': localversion,
737 737 # Cache keys are segmented by command.
738 738 b'command': pycompat.sysbytes(command),
739 739 # Throw in the media type and API version strings so changes
740 740 # to exchange semantics invalid cache.
741 741 b'mediatype': FRAMINGTYPE,
742 742 b'version': HTTP_WIREPROTO_V2,
743 743 # So same requests for different repos don't share cache keys.
744 744 b'repo': repo.root,
745 745 }
746 746
747 747 # The arguments passed to us will have already been normalized.
748 748 # Default values will be set, etc. This is important because it
749 749 # means that it doesn't matter if clients send an explicit argument
750 750 # or rely on the default value: it will all normalize to the same
751 751 # set of arguments on the server and therefore the same cache key.
752 752 #
753 753 # Arguments by their very nature must support being encoded to CBOR.
754 754 # And the CBOR encoder is deterministic. So we hash the arguments
755 755 # by feeding the CBOR of their representation into the hasher.
756 756 if allargs:
757 757 state[b'args'] = pycompat.byteskwargs(args)
758 758
759 759 cacher.adjustcachekeystate(state)
760 760
761 761 hasher = hashlib.sha1()
762 762 for chunk in cborutil.streamencode(state):
763 763 hasher.update(chunk)
764 764
765 765 return pycompat.sysbytes(hasher.hexdigest())
766 766
767 767 return cachekeyfn
768 768
769 769 def makeresponsecacher(repo, proto, command, args, objencoderfn,
770 770 redirecttargets, redirecthashes):
771 771 """Construct a cacher for a cacheable command.
772 772
773 773 Returns an ``iwireprotocolcommandcacher`` instance.
774 774
775 775 Extensions can monkeypatch this function to provide custom caching
776 776 backends.
777 777 """
778 778 return None
779 779
780 780 def resolvenodes(repo, revisions):
781 781 """Resolve nodes from a revisions specifier data structure."""
782 782 cl = repo.changelog
783 783 clhasnode = cl.hasnode
784 784
785 785 seen = set()
786 786 nodes = []
787 787
788 788 if not isinstance(revisions, list):
789 789 raise error.WireprotoCommandError('revisions must be defined as an '
790 790 'array')
791 791
792 792 for spec in revisions:
793 793 if b'type' not in spec:
794 794 raise error.WireprotoCommandError(
795 795 'type key not present in revision specifier')
796 796
797 797 typ = spec[b'type']
798 798
799 799 if typ == b'changesetexplicit':
800 800 if b'nodes' not in spec:
801 801 raise error.WireprotoCommandError(
802 802 'nodes key not present in changesetexplicit revision '
803 803 'specifier')
804 804
805 805 for node in spec[b'nodes']:
806 806 if node not in seen:
807 807 nodes.append(node)
808 808 seen.add(node)
809 809
810 810 elif typ == b'changesetexplicitdepth':
811 811 for key in (b'nodes', b'depth'):
812 812 if key not in spec:
813 813 raise error.WireprotoCommandError(
814 814 '%s key not present in changesetexplicitdepth revision '
815 815 'specifier', (key,))
816 816
817 817 for rev in repo.revs(b'ancestors(%ln, %d)', spec[b'nodes'],
818 818 spec[b'depth'] - 1):
819 819 node = cl.node(rev)
820 820
821 821 if node not in seen:
822 822 nodes.append(node)
823 823 seen.add(node)
824 824
825 825 elif typ == b'changesetdagrange':
826 826 for key in (b'roots', b'heads'):
827 827 if key not in spec:
828 828 raise error.WireprotoCommandError(
829 829 '%s key not present in changesetdagrange revision '
830 830 'specifier', (key,))
831 831
832 832 if not spec[b'heads']:
833 833 raise error.WireprotoCommandError(
834 834 'heads key in changesetdagrange cannot be empty')
835 835
836 836 if spec[b'roots']:
837 837 common = [n for n in spec[b'roots'] if clhasnode(n)]
838 838 else:
839 839 common = [nullid]
840 840
841 841 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
842 842 if n not in seen:
843 843 nodes.append(n)
844 844 seen.add(n)
845 845
846 846 else:
847 847 raise error.WireprotoCommandError(
848 848 'unknown revision specifier type: %s', (typ,))
849 849
850 850 return nodes
851 851
852 852 @wireprotocommand('branchmap', permission='pull')
853 853 def branchmapv2(repo, proto):
854 854 yield {encoding.fromlocal(k): v
855 855 for k, v in repo.branchmap().iteritems()}
856 856
857 857 @wireprotocommand('capabilities', permission='pull')
858 858 def capabilitiesv2(repo, proto):
859 859 yield _capabilitiesv2(repo, proto)
860 860
861 861 @wireprotocommand(
862 862 'changesetdata',
863 863 args={
864 864 'revisions': {
865 865 'type': 'list',
866 866 'example': [{
867 867 b'type': b'changesetexplicit',
868 868 b'nodes': [b'abcdef...'],
869 869 }],
870 870 },
871 871 'fields': {
872 872 'type': 'set',
873 873 'default': set,
874 874 'example': {b'parents', b'revision'},
875 875 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
876 876 },
877 877 },
878 878 permission='pull')
879 879 def changesetdata(repo, proto, revisions, fields):
880 880 # TODO look for unknown fields and abort when they can't be serviced.
881 881 # This could probably be validated by dispatcher using validvalues.
882 882
883 883 cl = repo.changelog
884 884 outgoing = resolvenodes(repo, revisions)
885 885 publishing = repo.publishing()
886 886
887 887 if outgoing:
888 888 repo.hook('preoutgoing', throw=True, source='serve')
889 889
890 890 yield {
891 891 b'totalitems': len(outgoing),
892 892 }
893 893
894 894 # The phases of nodes already transferred to the client may have changed
895 895 # since the client last requested data. We send phase-only records
896 896 # for these revisions, if requested.
897 897 # TODO actually do this. We'll probably want to emit phase heads
898 898 # in the ancestry set of the outgoing revisions. This will ensure
899 899 # that phase updates within that set are seen.
900 900 if b'phase' in fields:
901 901 pass
902 902
903 903 nodebookmarks = {}
904 904 for mark, node in repo._bookmarks.items():
905 905 nodebookmarks.setdefault(node, set()).add(mark)
906 906
907 907 # It is already topologically sorted by revision number.
908 908 for node in outgoing:
909 909 d = {
910 910 b'node': node,
911 911 }
912 912
913 913 if b'parents' in fields:
914 914 d[b'parents'] = cl.parents(node)
915 915
916 916 if b'phase' in fields:
917 917 if publishing:
918 918 d[b'phase'] = b'public'
919 919 else:
920 920 ctx = repo[node]
921 921 d[b'phase'] = ctx.phasestr()
922 922
923 923 if b'bookmarks' in fields and node in nodebookmarks:
924 924 d[b'bookmarks'] = sorted(nodebookmarks[node])
925 925 del nodebookmarks[node]
926 926
927 927 followingmeta = []
928 928 followingdata = []
929 929
930 930 if b'revision' in fields:
931 931 revisiondata = cl.revision(node, raw=True)
932 932 followingmeta.append((b'revision', len(revisiondata)))
933 933 followingdata.append(revisiondata)
934 934
935 935 # TODO make it possible for extensions to wrap a function or register
936 936 # a handler to service custom fields.
937 937
938 938 if followingmeta:
939 939 d[b'fieldsfollowing'] = followingmeta
940 940
941 941 yield d
942 942
943 943 for extra in followingdata:
944 944 yield extra
945 945
946 946 # If requested, send bookmarks from nodes that didn't have revision
947 947 # data sent so receiver is aware of any bookmark updates.
948 948 if b'bookmarks' in fields:
949 949 for node, marks in sorted(nodebookmarks.iteritems()):
950 950 yield {
951 951 b'node': node,
952 952 b'bookmarks': sorted(marks),
953 953 }
954 954
955 955 class FileAccessError(Exception):
956 956 """Represents an error accessing a specific file."""
957 957
958 958 def __init__(self, path, msg, args):
959 959 self.path = path
960 960 self.msg = msg
961 961 self.args = args
962 962
963 963 def getfilestore(repo, proto, path):
964 964 """Obtain a file storage object for use with wire protocol.
965 965
966 966 Exists as a standalone function so extensions can monkeypatch to add
967 967 access control.
968 968 """
969 969 # This seems to work even if the file doesn't exist. So catch
970 970 # "empty" files and return an error.
971 971 fl = repo.file(path)
972 972
973 973 if not len(fl):
974 974 raise FileAccessError(path, 'unknown file: %s', (path,))
975 975
976 976 return fl
977 977
978 def emitfilerevisions(revisions, fields):
979 for revision in revisions:
980 d = {
981 b'node': revision.node,
982 }
983
984 if b'parents' in fields:
985 d[b'parents'] = [revision.p1node, revision.p2node]
986
987 followingmeta = []
988 followingdata = []
989
990 if b'revision' in fields:
991 if revision.revision is not None:
992 followingmeta.append((b'revision', len(revision.revision)))
993 followingdata.append(revision.revision)
994 else:
995 d[b'deltabasenode'] = revision.basenode
996 followingmeta.append((b'delta', len(revision.delta)))
997 followingdata.append(revision.delta)
998
999 if followingmeta:
1000 d[b'fieldsfollowing'] = followingmeta
1001
1002 yield d
1003
1004 for extra in followingdata:
1005 yield extra
1006
978 1007 @wireprotocommand(
979 1008 'filedata',
980 1009 args={
981 1010 'haveparents': {
982 1011 'type': 'bool',
983 1012 'default': lambda: False,
984 1013 'example': True,
985 1014 },
986 1015 'nodes': {
987 1016 'type': 'list',
988 1017 'example': [b'0123456...'],
989 1018 },
990 1019 'fields': {
991 1020 'type': 'set',
992 1021 'default': set,
993 1022 'example': {b'parents', b'revision'},
994 1023 'validvalues': {b'parents', b'revision'},
995 1024 },
996 1025 'path': {
997 1026 'type': 'bytes',
998 1027 'example': b'foo.txt',
999 1028 }
1000 1029 },
1001 1030 permission='pull',
1002 1031 # TODO censoring a file revision won't invalidate the cache.
1003 1032 # Figure out a way to take censoring into account when deriving
1004 1033 # the cache key.
1005 1034 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
1006 1035 def filedata(repo, proto, haveparents, nodes, fields, path):
1007 1036 try:
1008 1037 # Extensions may wish to access the protocol handler.
1009 1038 store = getfilestore(repo, proto, path)
1010 1039 except FileAccessError as e:
1011 1040 raise error.WireprotoCommandError(e.msg, e.args)
1012 1041
1013 1042 # Validate requested nodes.
1014 1043 for node in nodes:
1015 1044 try:
1016 1045 store.rev(node)
1017 1046 except error.LookupError:
1018 1047 raise error.WireprotoCommandError('unknown file node: %s',
1019 1048 (hex(node),))
1020 1049
1021 1050 revisions = store.emitrevisions(nodes,
1022 1051 revisiondata=b'revision' in fields,
1023 1052 assumehaveparentrevisions=haveparents)
1024 1053
1025 1054 yield {
1026 1055 b'totalitems': len(nodes),
1027 1056 }
1028 1057
1029 for revision in revisions:
1030 d = {
1031 b'node': revision.node,
1032 }
1033
1034 if b'parents' in fields:
1035 d[b'parents'] = [revision.p1node, revision.p2node]
1036
1037 followingmeta = []
1038 followingdata = []
1039
1040 if b'revision' in fields:
1041 if revision.revision is not None:
1042 followingmeta.append((b'revision', len(revision.revision)))
1043 followingdata.append(revision.revision)
1044 else:
1045 d[b'deltabasenode'] = revision.basenode
1046 followingmeta.append((b'delta', len(revision.delta)))
1047 followingdata.append(revision.delta)
1048
1049 if followingmeta:
1050 d[b'fieldsfollowing'] = followingmeta
1051
1052 yield d
1053
1054 for extra in followingdata:
1055 yield extra
1058 for o in emitfilerevisions(revisions, fields):
1059 yield o
1056 1060
1057 1061 @wireprotocommand(
1058 1062 'heads',
1059 1063 args={
1060 1064 'publiconly': {
1061 1065 'type': 'bool',
1062 1066 'default': lambda: False,
1063 1067 'example': False,
1064 1068 },
1065 1069 },
1066 1070 permission='pull')
1067 1071 def headsv2(repo, proto, publiconly):
1068 1072 if publiconly:
1069 1073 repo = repo.filtered('immutable')
1070 1074
1071 1075 yield repo.heads()
1072 1076
1073 1077 @wireprotocommand(
1074 1078 'known',
1075 1079 args={
1076 1080 'nodes': {
1077 1081 'type': 'list',
1078 1082 'default': list,
1079 1083 'example': [b'deadbeef'],
1080 1084 },
1081 1085 },
1082 1086 permission='pull')
1083 1087 def knownv2(repo, proto, nodes):
1084 1088 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1085 1089 yield result
1086 1090
1087 1091 @wireprotocommand(
1088 1092 'listkeys',
1089 1093 args={
1090 1094 'namespace': {
1091 1095 'type': 'bytes',
1092 1096 'example': b'ns',
1093 1097 },
1094 1098 },
1095 1099 permission='pull')
1096 1100 def listkeysv2(repo, proto, namespace):
1097 1101 keys = repo.listkeys(encoding.tolocal(namespace))
1098 1102 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1099 1103 for k, v in keys.iteritems()}
1100 1104
1101 1105 yield keys
1102 1106
1103 1107 @wireprotocommand(
1104 1108 'lookup',
1105 1109 args={
1106 1110 'key': {
1107 1111 'type': 'bytes',
1108 1112 'example': b'foo',
1109 1113 },
1110 1114 },
1111 1115 permission='pull')
1112 1116 def lookupv2(repo, proto, key):
1113 1117 key = encoding.tolocal(key)
1114 1118
1115 1119 # TODO handle exception.
1116 1120 node = repo.lookup(key)
1117 1121
1118 1122 yield node
1119 1123
1120 1124 def manifestdatacapabilities(repo, proto):
1121 1125 batchsize = repo.ui.configint(
1122 1126 b'experimental', b'server.manifestdata.recommended-batch-size')
1123 1127
1124 1128 return {
1125 1129 b'recommendedbatchsize': batchsize,
1126 1130 }
1127 1131
1128 1132 @wireprotocommand(
1129 1133 'manifestdata',
1130 1134 args={
1131 1135 'nodes': {
1132 1136 'type': 'list',
1133 1137 'example': [b'0123456...'],
1134 1138 },
1135 1139 'haveparents': {
1136 1140 'type': 'bool',
1137 1141 'default': lambda: False,
1138 1142 'example': True,
1139 1143 },
1140 1144 'fields': {
1141 1145 'type': 'set',
1142 1146 'default': set,
1143 1147 'example': {b'parents', b'revision'},
1144 1148 'validvalues': {b'parents', b'revision'},
1145 1149 },
1146 1150 'tree': {
1147 1151 'type': 'bytes',
1148 1152 'example': b'',
1149 1153 },
1150 1154 },
1151 1155 permission='pull',
1152 1156 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True),
1153 1157 extracapabilitiesfn=manifestdatacapabilities)
1154 1158 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1155 1159 store = repo.manifestlog.getstorage(tree)
1156 1160
1157 1161 # Validate the node is known and abort on unknown revisions.
1158 1162 for node in nodes:
1159 1163 try:
1160 1164 store.rev(node)
1161 1165 except error.LookupError:
1162 1166 raise error.WireprotoCommandError(
1163 1167 'unknown node: %s', (node,))
1164 1168
1165 1169 revisions = store.emitrevisions(nodes,
1166 1170 revisiondata=b'revision' in fields,
1167 1171 assumehaveparentrevisions=haveparents)
1168 1172
1169 1173 yield {
1170 1174 b'totalitems': len(nodes),
1171 1175 }
1172 1176
1173 1177 for revision in revisions:
1174 1178 d = {
1175 1179 b'node': revision.node,
1176 1180 }
1177 1181
1178 1182 if b'parents' in fields:
1179 1183 d[b'parents'] = [revision.p1node, revision.p2node]
1180 1184
1181 1185 followingmeta = []
1182 1186 followingdata = []
1183 1187
1184 1188 if b'revision' in fields:
1185 1189 if revision.revision is not None:
1186 1190 followingmeta.append((b'revision', len(revision.revision)))
1187 1191 followingdata.append(revision.revision)
1188 1192 else:
1189 1193 d[b'deltabasenode'] = revision.basenode
1190 1194 followingmeta.append((b'delta', len(revision.delta)))
1191 1195 followingdata.append(revision.delta)
1192 1196
1193 1197 if followingmeta:
1194 1198 d[b'fieldsfollowing'] = followingmeta
1195 1199
1196 1200 yield d
1197 1201
1198 1202 for extra in followingdata:
1199 1203 yield extra
1200 1204
1201 1205 @wireprotocommand(
1202 1206 'pushkey',
1203 1207 args={
1204 1208 'namespace': {
1205 1209 'type': 'bytes',
1206 1210 'example': b'ns',
1207 1211 },
1208 1212 'key': {
1209 1213 'type': 'bytes',
1210 1214 'example': b'key',
1211 1215 },
1212 1216 'old': {
1213 1217 'type': 'bytes',
1214 1218 'example': b'old',
1215 1219 },
1216 1220 'new': {
1217 1221 'type': 'bytes',
1218 1222 'example': 'new',
1219 1223 },
1220 1224 },
1221 1225 permission='push')
1222 1226 def pushkeyv2(repo, proto, namespace, key, old, new):
1223 1227 # TODO handle ui output redirection
1224 1228 yield repo.pushkey(encoding.tolocal(namespace),
1225 1229 encoding.tolocal(key),
1226 1230 encoding.tolocal(old),
1227 1231 encoding.tolocal(new))
General Comments 0
You need to be logged in to leave comments. Login now