##// END OF EJS Templates
sidedata: send the correct revision data for wireproto v2...
Joerg Sonnenberger -
r46679:14ff4929 default
parent child Browse files
Show More
@@ -1,1613 +1,1613 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import collections
10 10 import contextlib
11 11
12 12 from .i18n import _
13 13 from .node import (
14 14 hex,
15 15 nullid,
16 16 )
17 17 from . import (
18 18 discovery,
19 19 encoding,
20 20 error,
21 21 match as matchmod,
22 22 narrowspec,
23 23 pycompat,
24 24 streamclone,
25 25 templatefilters,
26 26 util,
27 27 wireprotoframing,
28 28 wireprototypes,
29 29 )
30 30 from .interfaces import util as interfaceutil
31 31 from .utils import (
32 32 cborutil,
33 33 hashutil,
34 34 stringutil,
35 35 )
36 36
37 37 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
38 38
39 39 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
40 40
41 41 COMMANDS = wireprototypes.commanddict()
42 42
43 43 # Value inserted into cache key computation function. Change the value to
44 44 # force new cache keys for every command request. This should be done when
45 45 # there is a change to how caching works, etc.
46 46 GLOBAL_CACHE_VERSION = 1
47 47
48 48
49 49 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
50 50 from .hgweb import common as hgwebcommon
51 51
52 52 # URL space looks like: <permissions>/<command>, where <permission> can
53 53 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
54 54
55 55 # Root URL does nothing meaningful... yet.
56 56 if not urlparts:
57 57 res.status = b'200 OK'
58 58 res.headers[b'Content-Type'] = b'text/plain'
59 59 res.setbodybytes(_(b'HTTP version 2 API handler'))
60 60 return
61 61
62 62 if len(urlparts) == 1:
63 63 res.status = b'404 Not Found'
64 64 res.headers[b'Content-Type'] = b'text/plain'
65 65 res.setbodybytes(
66 66 _(b'do not know how to process %s\n') % req.dispatchpath
67 67 )
68 68 return
69 69
70 70 permission, command = urlparts[0:2]
71 71
72 72 if permission not in (b'ro', b'rw'):
73 73 res.status = b'404 Not Found'
74 74 res.headers[b'Content-Type'] = b'text/plain'
75 75 res.setbodybytes(_(b'unknown permission: %s') % permission)
76 76 return
77 77
78 78 if req.method != b'POST':
79 79 res.status = b'405 Method Not Allowed'
80 80 res.headers[b'Allow'] = b'POST'
81 81 res.setbodybytes(_(b'commands require POST requests'))
82 82 return
83 83
84 84 # At some point we'll want to use our own API instead of recycling the
85 85 # behavior of version 1 of the wire protocol...
86 86 # TODO return reasonable responses - not responses that overload the
87 87 # HTTP status line message for error reporting.
88 88 try:
89 89 checkperm(rctx, req, b'pull' if permission == b'ro' else b'push')
90 90 except hgwebcommon.ErrorResponse as e:
91 91 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
92 92 for k, v in e.headers:
93 93 res.headers[k] = v
94 94 res.setbodybytes(b'permission denied')
95 95 return
96 96
97 97 # We have a special endpoint to reflect the request back at the client.
98 98 if command == b'debugreflect':
99 99 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
100 100 return
101 101
102 102 # Extra commands that we handle that aren't really wire protocol
103 103 # commands. Think extra hard before making this hackery available to
104 104 # extension.
105 105 extracommands = {b'multirequest'}
106 106
107 107 if command not in COMMANDS and command not in extracommands:
108 108 res.status = b'404 Not Found'
109 109 res.headers[b'Content-Type'] = b'text/plain'
110 110 res.setbodybytes(_(b'unknown wire protocol command: %s\n') % command)
111 111 return
112 112
113 113 repo = rctx.repo
114 114 ui = repo.ui
115 115
116 116 proto = httpv2protocolhandler(req, ui)
117 117
118 118 if (
119 119 not COMMANDS.commandavailable(command, proto)
120 120 and command not in extracommands
121 121 ):
122 122 res.status = b'404 Not Found'
123 123 res.headers[b'Content-Type'] = b'text/plain'
124 124 res.setbodybytes(_(b'invalid wire protocol command: %s') % command)
125 125 return
126 126
127 127 # TODO consider cases where proxies may add additional Accept headers.
128 128 if req.headers.get(b'Accept') != FRAMINGTYPE:
129 129 res.status = b'406 Not Acceptable'
130 130 res.headers[b'Content-Type'] = b'text/plain'
131 131 res.setbodybytes(
132 132 _(b'client MUST specify Accept header with value: %s\n')
133 133 % FRAMINGTYPE
134 134 )
135 135 return
136 136
137 137 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
138 138 res.status = b'415 Unsupported Media Type'
139 139 # TODO we should send a response with appropriate media type,
140 140 # since client does Accept it.
141 141 res.headers[b'Content-Type'] = b'text/plain'
142 142 res.setbodybytes(
143 143 _(b'client MUST send Content-Type header with value: %s\n')
144 144 % FRAMINGTYPE
145 145 )
146 146 return
147 147
148 148 _processhttpv2request(ui, repo, req, res, permission, command, proto)
149 149
150 150
151 151 def _processhttpv2reflectrequest(ui, repo, req, res):
152 152 """Reads unified frame protocol request and dumps out state to client.
153 153
154 154 This special endpoint can be used to help debug the wire protocol.
155 155
156 156 Instead of routing the request through the normal dispatch mechanism,
157 157 we instead read all frames, decode them, and feed them into our state
158 158 tracker. We then dump the log of all that activity back out to the
159 159 client.
160 160 """
161 161 # Reflection APIs have a history of being abused, accidentally disclosing
162 162 # sensitive data, etc. So we have a config knob.
163 163 if not ui.configbool(b'experimental', b'web.api.debugreflect'):
164 164 res.status = b'404 Not Found'
165 165 res.headers[b'Content-Type'] = b'text/plain'
166 166 res.setbodybytes(_(b'debugreflect service not available'))
167 167 return
168 168
169 169 # We assume we have a unified framing protocol request body.
170 170
171 171 reactor = wireprotoframing.serverreactor(ui)
172 172 states = []
173 173
174 174 while True:
175 175 frame = wireprotoframing.readframe(req.bodyfh)
176 176
177 177 if not frame:
178 178 states.append(b'received: <no frame>')
179 179 break
180 180
181 181 states.append(
182 182 b'received: %d %d %d %s'
183 183 % (frame.typeid, frame.flags, frame.requestid, frame.payload)
184 184 )
185 185
186 186 action, meta = reactor.onframerecv(frame)
187 187 states.append(templatefilters.json((action, meta)))
188 188
189 189 action, meta = reactor.oninputeof()
190 190 meta[b'action'] = action
191 191 states.append(templatefilters.json(meta))
192 192
193 193 res.status = b'200 OK'
194 194 res.headers[b'Content-Type'] = b'text/plain'
195 195 res.setbodybytes(b'\n'.join(states))
196 196
197 197
198 198 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
199 199 """Post-validation handler for HTTPv2 requests.
200 200
201 201 Called when the HTTP request contains unified frame-based protocol
202 202 frames for evaluation.
203 203 """
204 204 # TODO Some HTTP clients are full duplex and can receive data before
205 205 # the entire request is transmitted. Figure out a way to indicate support
206 206 # for that so we can opt into full duplex mode.
207 207 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
208 208 seencommand = False
209 209
210 210 outstream = None
211 211
212 212 while True:
213 213 frame = wireprotoframing.readframe(req.bodyfh)
214 214 if not frame:
215 215 break
216 216
217 217 action, meta = reactor.onframerecv(frame)
218 218
219 219 if action == b'wantframe':
220 220 # Need more data before we can do anything.
221 221 continue
222 222 elif action == b'runcommand':
223 223 # Defer creating output stream because we need to wait for
224 224 # protocol settings frames so proper encoding can be applied.
225 225 if not outstream:
226 226 outstream = reactor.makeoutputstream()
227 227
228 228 sentoutput = _httpv2runcommand(
229 229 ui,
230 230 repo,
231 231 req,
232 232 res,
233 233 authedperm,
234 234 reqcommand,
235 235 reactor,
236 236 outstream,
237 237 meta,
238 238 issubsequent=seencommand,
239 239 )
240 240
241 241 if sentoutput:
242 242 return
243 243
244 244 seencommand = True
245 245
246 246 elif action == b'error':
247 247 # TODO define proper error mechanism.
248 248 res.status = b'200 OK'
249 249 res.headers[b'Content-Type'] = b'text/plain'
250 250 res.setbodybytes(meta[b'message'] + b'\n')
251 251 return
252 252 else:
253 253 raise error.ProgrammingError(
254 254 b'unhandled action from frame processor: %s' % action
255 255 )
256 256
257 257 action, meta = reactor.oninputeof()
258 258 if action == b'sendframes':
259 259 # We assume we haven't started sending the response yet. If we're
260 260 # wrong, the response type will raise an exception.
261 261 res.status = b'200 OK'
262 262 res.headers[b'Content-Type'] = FRAMINGTYPE
263 263 res.setbodygen(meta[b'framegen'])
264 264 elif action == b'noop':
265 265 pass
266 266 else:
267 267 raise error.ProgrammingError(
268 268 b'unhandled action from frame processor: %s' % action
269 269 )
270 270
271 271
272 272 def _httpv2runcommand(
273 273 ui,
274 274 repo,
275 275 req,
276 276 res,
277 277 authedperm,
278 278 reqcommand,
279 279 reactor,
280 280 outstream,
281 281 command,
282 282 issubsequent,
283 283 ):
284 284 """Dispatch a wire protocol command made from HTTPv2 requests.
285 285
286 286 The authenticated permission (``authedperm``) along with the original
287 287 command from the URL (``reqcommand``) are passed in.
288 288 """
289 289 # We already validated that the session has permissions to perform the
290 290 # actions in ``authedperm``. In the unified frame protocol, the canonical
291 291 # command to run is expressed in a frame. However, the URL also requested
292 292 # to run a specific command. We need to be careful that the command we
293 293 # run doesn't have permissions requirements greater than what was granted
294 294 # by ``authedperm``.
295 295 #
296 296 # Our rule for this is we only allow one command per HTTP request and
297 297 # that command must match the command in the URL. However, we make
298 298 # an exception for the ``multirequest`` URL. This URL is allowed to
299 299 # execute multiple commands. We double check permissions of each command
300 300 # as it is invoked to ensure there is no privilege escalation.
301 301 # TODO consider allowing multiple commands to regular command URLs
302 302 # iff each command is the same.
303 303
304 304 proto = httpv2protocolhandler(req, ui, args=command[b'args'])
305 305
306 306 if reqcommand == b'multirequest':
307 307 if not COMMANDS.commandavailable(command[b'command'], proto):
308 308 # TODO proper error mechanism
309 309 res.status = b'200 OK'
310 310 res.headers[b'Content-Type'] = b'text/plain'
311 311 res.setbodybytes(
312 312 _(b'wire protocol command not available: %s')
313 313 % command[b'command']
314 314 )
315 315 return True
316 316
317 317 # TODO don't use assert here, since it may be elided by -O.
318 318 assert authedperm in (b'ro', b'rw')
319 319 wirecommand = COMMANDS[command[b'command']]
320 320 assert wirecommand.permission in (b'push', b'pull')
321 321
322 322 if authedperm == b'ro' and wirecommand.permission != b'pull':
323 323 # TODO proper error mechanism
324 324 res.status = b'403 Forbidden'
325 325 res.headers[b'Content-Type'] = b'text/plain'
326 326 res.setbodybytes(
327 327 _(b'insufficient permissions to execute command: %s')
328 328 % command[b'command']
329 329 )
330 330 return True
331 331
332 332 # TODO should we also call checkperm() here? Maybe not if we're going
333 333 # to overhaul that API. The granted scope from the URL check should
334 334 # be good enough.
335 335
336 336 else:
337 337 # Don't allow multiple commands outside of ``multirequest`` URL.
338 338 if issubsequent:
339 339 # TODO proper error mechanism
340 340 res.status = b'200 OK'
341 341 res.headers[b'Content-Type'] = b'text/plain'
342 342 res.setbodybytes(
343 343 _(b'multiple commands cannot be issued to this URL')
344 344 )
345 345 return True
346 346
347 347 if reqcommand != command[b'command']:
348 348 # TODO define proper error mechanism
349 349 res.status = b'200 OK'
350 350 res.headers[b'Content-Type'] = b'text/plain'
351 351 res.setbodybytes(_(b'command in frame must match command in URL'))
352 352 return True
353 353
354 354 res.status = b'200 OK'
355 355 res.headers[b'Content-Type'] = FRAMINGTYPE
356 356
357 357 try:
358 358 objs = dispatch(repo, proto, command[b'command'], command[b'redirect'])
359 359
360 360 action, meta = reactor.oncommandresponsereadyobjects(
361 361 outstream, command[b'requestid'], objs
362 362 )
363 363
364 364 except error.WireprotoCommandError as e:
365 365 action, meta = reactor.oncommanderror(
366 366 outstream, command[b'requestid'], e.message, e.messageargs
367 367 )
368 368
369 369 except Exception as e:
370 370 action, meta = reactor.onservererror(
371 371 outstream,
372 372 command[b'requestid'],
373 373 _(b'exception when invoking command: %s')
374 374 % stringutil.forcebytestr(e),
375 375 )
376 376
377 377 if action == b'sendframes':
378 378 res.setbodygen(meta[b'framegen'])
379 379 return True
380 380 elif action == b'noop':
381 381 return False
382 382 else:
383 383 raise error.ProgrammingError(
384 384 b'unhandled event from reactor: %s' % action
385 385 )
386 386
387 387
388 388 def getdispatchrepo(repo, proto, command):
389 389 viewconfig = repo.ui.config(b'server', b'view')
390 390 return repo.filtered(viewconfig)
391 391
392 392
393 393 def dispatch(repo, proto, command, redirect):
394 394 """Run a wire protocol command.
395 395
396 396 Returns an iterable of objects that will be sent to the client.
397 397 """
398 398 repo = getdispatchrepo(repo, proto, command)
399 399
400 400 entry = COMMANDS[command]
401 401 func = entry.func
402 402 spec = entry.args
403 403
404 404 args = proto.getargs(spec)
405 405
406 406 # There is some duplicate boilerplate code here for calling the command and
407 407 # emitting objects. It is either that or a lot of indented code that looks
408 408 # like a pyramid (since there are a lot of code paths that result in not
409 409 # using the cacher).
410 410 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
411 411
412 412 # Request is not cacheable. Don't bother instantiating a cacher.
413 413 if not entry.cachekeyfn:
414 414 for o in callcommand():
415 415 yield o
416 416 return
417 417
418 418 if redirect:
419 419 redirecttargets = redirect[b'targets']
420 420 redirecthashes = redirect[b'hashes']
421 421 else:
422 422 redirecttargets = []
423 423 redirecthashes = []
424 424
425 425 cacher = makeresponsecacher(
426 426 repo,
427 427 proto,
428 428 command,
429 429 args,
430 430 cborutil.streamencode,
431 431 redirecttargets=redirecttargets,
432 432 redirecthashes=redirecthashes,
433 433 )
434 434
435 435 # But we have no cacher. Do default handling.
436 436 if not cacher:
437 437 for o in callcommand():
438 438 yield o
439 439 return
440 440
441 441 with cacher:
442 442 cachekey = entry.cachekeyfn(
443 443 repo, proto, cacher, **pycompat.strkwargs(args)
444 444 )
445 445
446 446 # No cache key or the cacher doesn't like it. Do default handling.
447 447 if cachekey is None or not cacher.setcachekey(cachekey):
448 448 for o in callcommand():
449 449 yield o
450 450 return
451 451
452 452 # Serve it from the cache, if possible.
453 453 cached = cacher.lookup()
454 454
455 455 if cached:
456 456 for o in cached[b'objs']:
457 457 yield o
458 458 return
459 459
460 460 # Else call the command and feed its output into the cacher, allowing
461 461 # the cacher to buffer/mutate objects as it desires.
462 462 for o in callcommand():
463 463 for o in cacher.onobject(o):
464 464 yield o
465 465
466 466 for o in cacher.onfinished():
467 467 yield o
468 468
469 469
470 470 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
471 471 class httpv2protocolhandler(object):
472 472 def __init__(self, req, ui, args=None):
473 473 self._req = req
474 474 self._ui = ui
475 475 self._args = args
476 476
477 477 @property
478 478 def name(self):
479 479 return HTTP_WIREPROTO_V2
480 480
481 481 def getargs(self, args):
482 482 # First look for args that were passed but aren't registered on this
483 483 # command.
484 484 extra = set(self._args) - set(args)
485 485 if extra:
486 486 raise error.WireprotoCommandError(
487 487 b'unsupported argument to command: %s'
488 488 % b', '.join(sorted(extra))
489 489 )
490 490
491 491 # And look for required arguments that are missing.
492 492 missing = {a for a in args if args[a][b'required']} - set(self._args)
493 493
494 494 if missing:
495 495 raise error.WireprotoCommandError(
496 496 b'missing required arguments: %s' % b', '.join(sorted(missing))
497 497 )
498 498
499 499 # Now derive the arguments to pass to the command, taking into
500 500 # account the arguments specified by the client.
501 501 data = {}
502 502 for k, meta in sorted(args.items()):
503 503 # This argument wasn't passed by the client.
504 504 if k not in self._args:
505 505 data[k] = meta[b'default']()
506 506 continue
507 507
508 508 v = self._args[k]
509 509
510 510 # Sets may be expressed as lists. Silently normalize.
511 511 if meta[b'type'] == b'set' and isinstance(v, list):
512 512 v = set(v)
513 513
514 514 # TODO consider more/stronger type validation.
515 515
516 516 data[k] = v
517 517
518 518 return data
519 519
520 520 def getprotocaps(self):
521 521 # Protocol capabilities are currently not implemented for HTTP V2.
522 522 return set()
523 523
524 524 def getpayload(self):
525 525 raise NotImplementedError
526 526
527 527 @contextlib.contextmanager
528 528 def mayberedirectstdio(self):
529 529 raise NotImplementedError
530 530
531 531 def client(self):
532 532 raise NotImplementedError
533 533
534 534 def addcapabilities(self, repo, caps):
535 535 return caps
536 536
537 537 def checkperm(self, perm):
538 538 raise NotImplementedError
539 539
540 540
541 541 def httpv2apidescriptor(req, repo):
542 542 proto = httpv2protocolhandler(req, repo.ui)
543 543
544 544 return _capabilitiesv2(repo, proto)
545 545
546 546
547 547 def _capabilitiesv2(repo, proto):
548 548 """Obtain the set of capabilities for version 2 transports.
549 549
550 550 These capabilities are distinct from the capabilities for version 1
551 551 transports.
552 552 """
553 553 caps = {
554 554 b'commands': {},
555 555 b'framingmediatypes': [FRAMINGTYPE],
556 556 b'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
557 557 }
558 558
559 559 for command, entry in COMMANDS.items():
560 560 args = {}
561 561
562 562 for arg, meta in entry.args.items():
563 563 args[arg] = {
564 564 # TODO should this be a normalized type using CBOR's
565 565 # terminology?
566 566 b'type': meta[b'type'],
567 567 b'required': meta[b'required'],
568 568 }
569 569
570 570 if not meta[b'required']:
571 571 args[arg][b'default'] = meta[b'default']()
572 572
573 573 if meta[b'validvalues']:
574 574 args[arg][b'validvalues'] = meta[b'validvalues']
575 575
576 576 # TODO this type of check should be defined in a per-command callback.
577 577 if (
578 578 command == b'rawstorefiledata'
579 579 and not streamclone.allowservergeneration(repo)
580 580 ):
581 581 continue
582 582
583 583 caps[b'commands'][command] = {
584 584 b'args': args,
585 585 b'permissions': [entry.permission],
586 586 }
587 587
588 588 if entry.extracapabilitiesfn:
589 589 extracaps = entry.extracapabilitiesfn(repo, proto)
590 590 caps[b'commands'][command].update(extracaps)
591 591
592 592 caps[b'rawrepoformats'] = sorted(repo.requirements & repo.supportedformats)
593 593
594 594 targets = getadvertisedredirecttargets(repo, proto)
595 595 if targets:
596 596 caps[b'redirect'] = {
597 597 b'targets': [],
598 598 b'hashes': [b'sha256', b'sha1'],
599 599 }
600 600
601 601 for target in targets:
602 602 entry = {
603 603 b'name': target[b'name'],
604 604 b'protocol': target[b'protocol'],
605 605 b'uris': target[b'uris'],
606 606 }
607 607
608 608 for key in (b'snirequired', b'tlsversions'):
609 609 if key in target:
610 610 entry[key] = target[key]
611 611
612 612 caps[b'redirect'][b'targets'].append(entry)
613 613
614 614 return proto.addcapabilities(repo, caps)
615 615
616 616
617 617 def getadvertisedredirecttargets(repo, proto):
618 618 """Obtain a list of content redirect targets.
619 619
620 620 Returns a list containing potential redirect targets that will be
621 621 advertised in capabilities data. Each dict MUST have the following
622 622 keys:
623 623
624 624 name
625 625 The name of this redirect target. This is the identifier clients use
626 626 to refer to a target. It is transferred as part of every command
627 627 request.
628 628
629 629 protocol
630 630 Network protocol used by this target. Typically this is the string
631 631 in front of the ``://`` in a URL. e.g. ``https``.
632 632
633 633 uris
634 634 List of representative URIs for this target. Clients can use the
635 635 URIs to test parsing for compatibility or for ordering preference
636 636 for which target to use.
637 637
638 638 The following optional keys are recognized:
639 639
640 640 snirequired
641 641 Bool indicating if Server Name Indication (SNI) is required to
642 642 connect to this target.
643 643
644 644 tlsversions
645 645 List of bytes indicating which TLS versions are supported by this
646 646 target.
647 647
648 648 By default, clients reflect the target order advertised by servers
649 649 and servers will use the first client-advertised target when picking
650 650 a redirect target. So targets should be advertised in the order the
651 651 server prefers they be used.
652 652 """
653 653 return []
654 654
655 655
656 656 def wireprotocommand(
657 657 name,
658 658 args=None,
659 659 permission=b'push',
660 660 cachekeyfn=None,
661 661 extracapabilitiesfn=None,
662 662 ):
663 663 """Decorator to declare a wire protocol command.
664 664
665 665 ``name`` is the name of the wire protocol command being provided.
666 666
667 667 ``args`` is a dict defining arguments accepted by the command. Keys are
668 668 the argument name. Values are dicts with the following keys:
669 669
670 670 ``type``
671 671 The argument data type. Must be one of the following string
672 672 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
673 673 or ``bool``.
674 674
675 675 ``default``
676 676 A callable returning the default value for this argument. If not
677 677 specified, ``None`` will be the default value.
678 678
679 679 ``example``
680 680 An example value for this argument.
681 681
682 682 ``validvalues``
683 683 Set of recognized values for this argument.
684 684
685 685 ``permission`` defines the permission type needed to run this command.
686 686 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
687 687 respectively. Default is to assume command requires ``push`` permissions
688 688 because otherwise commands not declaring their permissions could modify
689 689 a repository that is supposed to be read-only.
690 690
691 691 ``cachekeyfn`` defines an optional callable that can derive the
692 692 cache key for this request.
693 693
694 694 ``extracapabilitiesfn`` defines an optional callable that defines extra
695 695 command capabilities/parameters that are advertised next to the command
696 696 in the capabilities data structure describing the server. The callable
697 697 receives as arguments the repository and protocol objects. It returns
698 698 a dict of extra fields to add to the command descriptor.
699 699
700 700 Wire protocol commands are generators of objects to be serialized and
701 701 sent to the client.
702 702
703 703 If a command raises an uncaught exception, this will be translated into
704 704 a command error.
705 705
706 706 All commands can opt in to being cacheable by defining a function
707 707 (``cachekeyfn``) that is called to derive a cache key. This function
708 708 receives the same arguments as the command itself plus a ``cacher``
709 709 argument containing the active cacher for the request and returns a bytes
710 710 containing the key in a cache the response to this command may be cached
711 711 under.
712 712 """
713 713 transports = {
714 714 k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 2
715 715 }
716 716
717 717 if permission not in (b'push', b'pull'):
718 718 raise error.ProgrammingError(
719 719 b'invalid wire protocol permission; '
720 720 b'got %s; expected "push" or "pull"' % permission
721 721 )
722 722
723 723 if args is None:
724 724 args = {}
725 725
726 726 if not isinstance(args, dict):
727 727 raise error.ProgrammingError(
728 728 b'arguments for version 2 commands must be declared as dicts'
729 729 )
730 730
731 731 for arg, meta in args.items():
732 732 if arg == b'*':
733 733 raise error.ProgrammingError(
734 734 b'* argument name not allowed on version 2 commands'
735 735 )
736 736
737 737 if not isinstance(meta, dict):
738 738 raise error.ProgrammingError(
739 739 b'arguments for version 2 commands '
740 740 b'must declare metadata as a dict'
741 741 )
742 742
743 743 if b'type' not in meta:
744 744 raise error.ProgrammingError(
745 745 b'%s argument for command %s does not '
746 746 b'declare type field' % (arg, name)
747 747 )
748 748
749 749 if meta[b'type'] not in (
750 750 b'bytes',
751 751 b'int',
752 752 b'list',
753 753 b'dict',
754 754 b'set',
755 755 b'bool',
756 756 ):
757 757 raise error.ProgrammingError(
758 758 b'%s argument for command %s has '
759 759 b'illegal type: %s' % (arg, name, meta[b'type'])
760 760 )
761 761
762 762 if b'example' not in meta:
763 763 raise error.ProgrammingError(
764 764 b'%s argument for command %s does not '
765 765 b'declare example field' % (arg, name)
766 766 )
767 767
768 768 meta[b'required'] = b'default' not in meta
769 769
770 770 meta.setdefault(b'default', lambda: None)
771 771 meta.setdefault(b'validvalues', None)
772 772
773 773 def register(func):
774 774 if name in COMMANDS:
775 775 raise error.ProgrammingError(
776 776 b'%s command already registered for version 2' % name
777 777 )
778 778
779 779 COMMANDS[name] = wireprototypes.commandentry(
780 780 func,
781 781 args=args,
782 782 transports=transports,
783 783 permission=permission,
784 784 cachekeyfn=cachekeyfn,
785 785 extracapabilitiesfn=extracapabilitiesfn,
786 786 )
787 787
788 788 return func
789 789
790 790 return register
791 791
792 792
793 793 def makecommandcachekeyfn(command, localversion=None, allargs=False):
794 794 """Construct a cache key derivation function with common features.
795 795
796 796 By default, the cache key is a hash of:
797 797
798 798 * The command name.
799 799 * A global cache version number.
800 800 * A local cache version number (passed via ``localversion``).
801 801 * All the arguments passed to the command.
802 802 * The media type used.
803 803 * Wire protocol version string.
804 804 * The repository path.
805 805 """
806 806 if not allargs:
807 807 raise error.ProgrammingError(
808 808 b'only allargs=True is currently supported'
809 809 )
810 810
811 811 if localversion is None:
812 812 raise error.ProgrammingError(b'must set localversion argument value')
813 813
814 814 def cachekeyfn(repo, proto, cacher, **args):
815 815 spec = COMMANDS[command]
816 816
817 817 # Commands that mutate the repo can not be cached.
818 818 if spec.permission == b'push':
819 819 return None
820 820
821 821 # TODO config option to disable caching.
822 822
823 823 # Our key derivation strategy is to construct a data structure
824 824 # holding everything that could influence cacheability and to hash
825 825 # the CBOR representation of that. Using CBOR seems like it might
826 826 # be overkill. However, simpler hashing mechanisms are prone to
827 827 # duplicate input issues. e.g. if you just concatenate two values,
828 828 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
829 829 # "padding" between values and prevents these problems.
830 830
831 831 # Seed the hash with various data.
832 832 state = {
833 833 # To invalidate all cache keys.
834 834 b'globalversion': GLOBAL_CACHE_VERSION,
835 835 # More granular cache key invalidation.
836 836 b'localversion': localversion,
837 837 # Cache keys are segmented by command.
838 838 b'command': command,
839 839 # Throw in the media type and API version strings so changes
840 840 # to exchange semantics invalid cache.
841 841 b'mediatype': FRAMINGTYPE,
842 842 b'version': HTTP_WIREPROTO_V2,
843 843 # So same requests for different repos don't share cache keys.
844 844 b'repo': repo.root,
845 845 }
846 846
847 847 # The arguments passed to us will have already been normalized.
848 848 # Default values will be set, etc. This is important because it
849 849 # means that it doesn't matter if clients send an explicit argument
850 850 # or rely on the default value: it will all normalize to the same
851 851 # set of arguments on the server and therefore the same cache key.
852 852 #
853 853 # Arguments by their very nature must support being encoded to CBOR.
854 854 # And the CBOR encoder is deterministic. So we hash the arguments
855 855 # by feeding the CBOR of their representation into the hasher.
856 856 if allargs:
857 857 state[b'args'] = pycompat.byteskwargs(args)
858 858
859 859 cacher.adjustcachekeystate(state)
860 860
861 861 hasher = hashutil.sha1()
862 862 for chunk in cborutil.streamencode(state):
863 863 hasher.update(chunk)
864 864
865 865 return pycompat.sysbytes(hasher.hexdigest())
866 866
867 867 return cachekeyfn
868 868
869 869
870 870 def makeresponsecacher(
871 871 repo, proto, command, args, objencoderfn, redirecttargets, redirecthashes
872 872 ):
873 873 """Construct a cacher for a cacheable command.
874 874
875 875 Returns an ``iwireprotocolcommandcacher`` instance.
876 876
877 877 Extensions can monkeypatch this function to provide custom caching
878 878 backends.
879 879 """
880 880 return None
881 881
882 882
883 883 def resolvenodes(repo, revisions):
884 884 """Resolve nodes from a revisions specifier data structure."""
885 885 cl = repo.changelog
886 886 clhasnode = cl.hasnode
887 887
888 888 seen = set()
889 889 nodes = []
890 890
891 891 if not isinstance(revisions, list):
892 892 raise error.WireprotoCommandError(
893 893 b'revisions must be defined as an array'
894 894 )
895 895
896 896 for spec in revisions:
897 897 if b'type' not in spec:
898 898 raise error.WireprotoCommandError(
899 899 b'type key not present in revision specifier'
900 900 )
901 901
902 902 typ = spec[b'type']
903 903
904 904 if typ == b'changesetexplicit':
905 905 if b'nodes' not in spec:
906 906 raise error.WireprotoCommandError(
907 907 b'nodes key not present in changesetexplicit revision '
908 908 b'specifier'
909 909 )
910 910
911 911 for node in spec[b'nodes']:
912 912 if node not in seen:
913 913 nodes.append(node)
914 914 seen.add(node)
915 915
916 916 elif typ == b'changesetexplicitdepth':
917 917 for key in (b'nodes', b'depth'):
918 918 if key not in spec:
919 919 raise error.WireprotoCommandError(
920 920 b'%s key not present in changesetexplicitdepth revision '
921 921 b'specifier',
922 922 (key,),
923 923 )
924 924
925 925 for rev in repo.revs(
926 926 b'ancestors(%ln, %s)', spec[b'nodes'], spec[b'depth'] - 1
927 927 ):
928 928 node = cl.node(rev)
929 929
930 930 if node not in seen:
931 931 nodes.append(node)
932 932 seen.add(node)
933 933
934 934 elif typ == b'changesetdagrange':
935 935 for key in (b'roots', b'heads'):
936 936 if key not in spec:
937 937 raise error.WireprotoCommandError(
938 938 b'%s key not present in changesetdagrange revision '
939 939 b'specifier',
940 940 (key,),
941 941 )
942 942
943 943 if not spec[b'heads']:
944 944 raise error.WireprotoCommandError(
945 945 b'heads key in changesetdagrange cannot be empty'
946 946 )
947 947
948 948 if spec[b'roots']:
949 949 common = [n for n in spec[b'roots'] if clhasnode(n)]
950 950 else:
951 951 common = [nullid]
952 952
953 953 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
954 954 if n not in seen:
955 955 nodes.append(n)
956 956 seen.add(n)
957 957
958 958 else:
959 959 raise error.WireprotoCommandError(
960 960 b'unknown revision specifier type: %s', (typ,)
961 961 )
962 962
963 963 return nodes
964 964
965 965
966 966 @wireprotocommand(b'branchmap', permission=b'pull')
967 967 def branchmapv2(repo, proto):
968 968 yield {
969 969 encoding.fromlocal(k): v
970 970 for k, v in pycompat.iteritems(repo.branchmap())
971 971 }
972 972
973 973
974 974 @wireprotocommand(b'capabilities', permission=b'pull')
975 975 def capabilitiesv2(repo, proto):
976 976 yield _capabilitiesv2(repo, proto)
977 977
978 978
979 979 @wireprotocommand(
980 980 b'changesetdata',
981 981 args={
982 982 b'revisions': {
983 983 b'type': b'list',
984 984 b'example': [
985 985 {
986 986 b'type': b'changesetexplicit',
987 987 b'nodes': [b'abcdef...'],
988 988 }
989 989 ],
990 990 },
991 991 b'fields': {
992 992 b'type': b'set',
993 993 b'default': set,
994 994 b'example': {b'parents', b'revision'},
995 995 b'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
996 996 },
997 997 },
998 998 permission=b'pull',
999 999 )
1000 1000 def changesetdata(repo, proto, revisions, fields):
1001 1001 # TODO look for unknown fields and abort when they can't be serviced.
1002 1002 # This could probably be validated by dispatcher using validvalues.
1003 1003
1004 1004 cl = repo.changelog
1005 1005 outgoing = resolvenodes(repo, revisions)
1006 1006 publishing = repo.publishing()
1007 1007
1008 1008 if outgoing:
1009 1009 repo.hook(b'preoutgoing', throw=True, source=b'serve')
1010 1010
1011 1011 yield {
1012 1012 b'totalitems': len(outgoing),
1013 1013 }
1014 1014
1015 1015 # The phases of nodes already transferred to the client may have changed
1016 1016 # since the client last requested data. We send phase-only records
1017 1017 # for these revisions, if requested.
1018 1018 # TODO actually do this. We'll probably want to emit phase heads
1019 1019 # in the ancestry set of the outgoing revisions. This will ensure
1020 1020 # that phase updates within that set are seen.
1021 1021 if b'phase' in fields:
1022 1022 pass
1023 1023
1024 1024 nodebookmarks = {}
1025 1025 for mark, node in repo._bookmarks.items():
1026 1026 nodebookmarks.setdefault(node, set()).add(mark)
1027 1027
1028 1028 # It is already topologically sorted by revision number.
1029 1029 for node in outgoing:
1030 1030 d = {
1031 1031 b'node': node,
1032 1032 }
1033 1033
1034 1034 if b'parents' in fields:
1035 1035 d[b'parents'] = cl.parents(node)
1036 1036
1037 1037 if b'phase' in fields:
1038 1038 if publishing:
1039 1039 d[b'phase'] = b'public'
1040 1040 else:
1041 1041 ctx = repo[node]
1042 1042 d[b'phase'] = ctx.phasestr()
1043 1043
1044 1044 if b'bookmarks' in fields and node in nodebookmarks:
1045 1045 d[b'bookmarks'] = sorted(nodebookmarks[node])
1046 1046 del nodebookmarks[node]
1047 1047
1048 1048 followingmeta = []
1049 1049 followingdata = []
1050 1050
1051 1051 if b'revision' in fields:
1052 revisiondata = cl.rawdata(node)
1052 revisiondata = cl.revision(node)
1053 1053 followingmeta.append((b'revision', len(revisiondata)))
1054 1054 followingdata.append(revisiondata)
1055 1055
1056 1056 # TODO make it possible for extensions to wrap a function or register
1057 1057 # a handler to service custom fields.
1058 1058
1059 1059 if followingmeta:
1060 1060 d[b'fieldsfollowing'] = followingmeta
1061 1061
1062 1062 yield d
1063 1063
1064 1064 for extra in followingdata:
1065 1065 yield extra
1066 1066
1067 1067 # If requested, send bookmarks from nodes that didn't have revision
1068 1068 # data sent so receiver is aware of any bookmark updates.
1069 1069 if b'bookmarks' in fields:
1070 1070 for node, marks in sorted(pycompat.iteritems(nodebookmarks)):
1071 1071 yield {
1072 1072 b'node': node,
1073 1073 b'bookmarks': sorted(marks),
1074 1074 }
1075 1075
1076 1076
1077 1077 class FileAccessError(Exception):
1078 1078 """Represents an error accessing a specific file."""
1079 1079
1080 1080 def __init__(self, path, msg, args):
1081 1081 self.path = path
1082 1082 self.msg = msg
1083 1083 self.args = args
1084 1084
1085 1085
1086 1086 def getfilestore(repo, proto, path):
1087 1087 """Obtain a file storage object for use with wire protocol.
1088 1088
1089 1089 Exists as a standalone function so extensions can monkeypatch to add
1090 1090 access control.
1091 1091 """
1092 1092 # This seems to work even if the file doesn't exist. So catch
1093 1093 # "empty" files and return an error.
1094 1094 fl = repo.file(path)
1095 1095
1096 1096 if not len(fl):
1097 1097 raise FileAccessError(path, b'unknown file: %s', (path,))
1098 1098
1099 1099 return fl
1100 1100
1101 1101
1102 1102 def emitfilerevisions(repo, path, revisions, linknodes, fields):
1103 1103 for revision in revisions:
1104 1104 d = {
1105 1105 b'node': revision.node,
1106 1106 }
1107 1107
1108 1108 if b'parents' in fields:
1109 1109 d[b'parents'] = [revision.p1node, revision.p2node]
1110 1110
1111 1111 if b'linknode' in fields:
1112 1112 d[b'linknode'] = linknodes[revision.node]
1113 1113
1114 1114 followingmeta = []
1115 1115 followingdata = []
1116 1116
1117 1117 if b'revision' in fields:
1118 1118 if revision.revision is not None:
1119 1119 followingmeta.append((b'revision', len(revision.revision)))
1120 1120 followingdata.append(revision.revision)
1121 1121 else:
1122 1122 d[b'deltabasenode'] = revision.basenode
1123 1123 followingmeta.append((b'delta', len(revision.delta)))
1124 1124 followingdata.append(revision.delta)
1125 1125
1126 1126 if followingmeta:
1127 1127 d[b'fieldsfollowing'] = followingmeta
1128 1128
1129 1129 yield d
1130 1130
1131 1131 for extra in followingdata:
1132 1132 yield extra
1133 1133
1134 1134
1135 1135 def makefilematcher(repo, pathfilter):
1136 1136 """Construct a matcher from a path filter dict."""
1137 1137
1138 1138 # Validate values.
1139 1139 if pathfilter:
1140 1140 for key in (b'include', b'exclude'):
1141 1141 for pattern in pathfilter.get(key, []):
1142 1142 if not pattern.startswith((b'path:', b'rootfilesin:')):
1143 1143 raise error.WireprotoCommandError(
1144 1144 b'%s pattern must begin with `path:` or `rootfilesin:`; '
1145 1145 b'got %s',
1146 1146 (key, pattern),
1147 1147 )
1148 1148
1149 1149 if pathfilter:
1150 1150 matcher = matchmod.match(
1151 1151 repo.root,
1152 1152 b'',
1153 1153 include=pathfilter.get(b'include', []),
1154 1154 exclude=pathfilter.get(b'exclude', []),
1155 1155 )
1156 1156 else:
1157 1157 matcher = matchmod.match(repo.root, b'')
1158 1158
1159 1159 # Requested patterns could include files not in the local store. So
1160 1160 # filter those out.
1161 1161 return repo.narrowmatch(matcher)
1162 1162
1163 1163
1164 1164 @wireprotocommand(
1165 1165 b'filedata',
1166 1166 args={
1167 1167 b'haveparents': {
1168 1168 b'type': b'bool',
1169 1169 b'default': lambda: False,
1170 1170 b'example': True,
1171 1171 },
1172 1172 b'nodes': {
1173 1173 b'type': b'list',
1174 1174 b'example': [b'0123456...'],
1175 1175 },
1176 1176 b'fields': {
1177 1177 b'type': b'set',
1178 1178 b'default': set,
1179 1179 b'example': {b'parents', b'revision'},
1180 1180 b'validvalues': {b'parents', b'revision', b'linknode'},
1181 1181 },
1182 1182 b'path': {
1183 1183 b'type': b'bytes',
1184 1184 b'example': b'foo.txt',
1185 1185 },
1186 1186 },
1187 1187 permission=b'pull',
1188 1188 # TODO censoring a file revision won't invalidate the cache.
1189 1189 # Figure out a way to take censoring into account when deriving
1190 1190 # the cache key.
1191 1191 cachekeyfn=makecommandcachekeyfn(b'filedata', 1, allargs=True),
1192 1192 )
1193 1193 def filedata(repo, proto, haveparents, nodes, fields, path):
1194 1194 # TODO this API allows access to file revisions that are attached to
1195 1195 # secret changesets. filesdata does not have this problem. Maybe this
1196 1196 # API should be deleted?
1197 1197
1198 1198 try:
1199 1199 # Extensions may wish to access the protocol handler.
1200 1200 store = getfilestore(repo, proto, path)
1201 1201 except FileAccessError as e:
1202 1202 raise error.WireprotoCommandError(e.msg, e.args)
1203 1203
1204 1204 clnode = repo.changelog.node
1205 1205 linknodes = {}
1206 1206
1207 1207 # Validate requested nodes.
1208 1208 for node in nodes:
1209 1209 try:
1210 1210 store.rev(node)
1211 1211 except error.LookupError:
1212 1212 raise error.WireprotoCommandError(
1213 1213 b'unknown file node: %s', (hex(node),)
1214 1214 )
1215 1215
1216 1216 # TODO by creating the filectx against a specific file revision
1217 1217 # instead of changeset, linkrev() is always used. This is wrong for
1218 1218 # cases where linkrev() may refer to a hidden changeset. But since this
1219 1219 # API doesn't know anything about changesets, we're not sure how to
1220 1220 # disambiguate the linknode. Perhaps we should delete this API?
1221 1221 fctx = repo.filectx(path, fileid=node)
1222 1222 linknodes[node] = clnode(fctx.introrev())
1223 1223
1224 1224 revisions = store.emitrevisions(
1225 1225 nodes,
1226 1226 revisiondata=b'revision' in fields,
1227 1227 assumehaveparentrevisions=haveparents,
1228 1228 )
1229 1229
1230 1230 yield {
1231 1231 b'totalitems': len(nodes),
1232 1232 }
1233 1233
1234 1234 for o in emitfilerevisions(repo, path, revisions, linknodes, fields):
1235 1235 yield o
1236 1236
1237 1237
1238 1238 def filesdatacapabilities(repo, proto):
1239 1239 batchsize = repo.ui.configint(
1240 1240 b'experimental', b'server.filesdata.recommended-batch-size'
1241 1241 )
1242 1242 return {
1243 1243 b'recommendedbatchsize': batchsize,
1244 1244 }
1245 1245
1246 1246
1247 1247 @wireprotocommand(
1248 1248 b'filesdata',
1249 1249 args={
1250 1250 b'haveparents': {
1251 1251 b'type': b'bool',
1252 1252 b'default': lambda: False,
1253 1253 b'example': True,
1254 1254 },
1255 1255 b'fields': {
1256 1256 b'type': b'set',
1257 1257 b'default': set,
1258 1258 b'example': {b'parents', b'revision'},
1259 1259 b'validvalues': {
1260 1260 b'firstchangeset',
1261 1261 b'linknode',
1262 1262 b'parents',
1263 1263 b'revision',
1264 1264 },
1265 1265 },
1266 1266 b'pathfilter': {
1267 1267 b'type': b'dict',
1268 1268 b'default': lambda: None,
1269 1269 b'example': {b'include': [b'path:tests']},
1270 1270 },
1271 1271 b'revisions': {
1272 1272 b'type': b'list',
1273 1273 b'example': [
1274 1274 {
1275 1275 b'type': b'changesetexplicit',
1276 1276 b'nodes': [b'abcdef...'],
1277 1277 }
1278 1278 ],
1279 1279 },
1280 1280 },
1281 1281 permission=b'pull',
1282 1282 # TODO censoring a file revision won't invalidate the cache.
1283 1283 # Figure out a way to take censoring into account when deriving
1284 1284 # the cache key.
1285 1285 cachekeyfn=makecommandcachekeyfn(b'filesdata', 1, allargs=True),
1286 1286 extracapabilitiesfn=filesdatacapabilities,
1287 1287 )
1288 1288 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1289 1289 # TODO This should operate on a repo that exposes obsolete changesets. There
1290 1290 # is a race between a client making a push that obsoletes a changeset and
1291 1291 # another client fetching files data for that changeset. If a client has a
1292 1292 # changeset, it should probably be allowed to access files data for that
1293 1293 # changeset.
1294 1294
1295 1295 outgoing = resolvenodes(repo, revisions)
1296 1296 filematcher = makefilematcher(repo, pathfilter)
1297 1297
1298 1298 # path -> {fnode: linknode}
1299 1299 fnodes = collections.defaultdict(dict)
1300 1300
1301 1301 # We collect the set of relevant file revisions by iterating the changeset
1302 1302 # revisions and either walking the set of files recorded in the changeset
1303 1303 # or by walking the manifest at that revision. There is probably room for a
1304 1304 # storage-level API to request this data, as it can be expensive to compute
1305 1305 # and would benefit from caching or alternate storage from what revlogs
1306 1306 # provide.
1307 1307 for node in outgoing:
1308 1308 ctx = repo[node]
1309 1309 mctx = ctx.manifestctx()
1310 1310 md = mctx.read()
1311 1311
1312 1312 if haveparents:
1313 1313 checkpaths = ctx.files()
1314 1314 else:
1315 1315 checkpaths = md.keys()
1316 1316
1317 1317 for path in checkpaths:
1318 1318 fnode = md[path]
1319 1319
1320 1320 if path in fnodes and fnode in fnodes[path]:
1321 1321 continue
1322 1322
1323 1323 if not filematcher(path):
1324 1324 continue
1325 1325
1326 1326 fnodes[path].setdefault(fnode, node)
1327 1327
1328 1328 yield {
1329 1329 b'totalpaths': len(fnodes),
1330 1330 b'totalitems': sum(len(v) for v in fnodes.values()),
1331 1331 }
1332 1332
1333 1333 for path, filenodes in sorted(fnodes.items()):
1334 1334 try:
1335 1335 store = getfilestore(repo, proto, path)
1336 1336 except FileAccessError as e:
1337 1337 raise error.WireprotoCommandError(e.msg, e.args)
1338 1338
1339 1339 yield {
1340 1340 b'path': path,
1341 1341 b'totalitems': len(filenodes),
1342 1342 }
1343 1343
1344 1344 revisions = store.emitrevisions(
1345 1345 filenodes.keys(),
1346 1346 revisiondata=b'revision' in fields,
1347 1347 assumehaveparentrevisions=haveparents,
1348 1348 )
1349 1349
1350 1350 for o in emitfilerevisions(repo, path, revisions, filenodes, fields):
1351 1351 yield o
1352 1352
1353 1353
1354 1354 @wireprotocommand(
1355 1355 b'heads',
1356 1356 args={
1357 1357 b'publiconly': {
1358 1358 b'type': b'bool',
1359 1359 b'default': lambda: False,
1360 1360 b'example': False,
1361 1361 },
1362 1362 },
1363 1363 permission=b'pull',
1364 1364 )
1365 1365 def headsv2(repo, proto, publiconly):
1366 1366 if publiconly:
1367 1367 repo = repo.filtered(b'immutable')
1368 1368
1369 1369 yield repo.heads()
1370 1370
1371 1371
1372 1372 @wireprotocommand(
1373 1373 b'known',
1374 1374 args={
1375 1375 b'nodes': {
1376 1376 b'type': b'list',
1377 1377 b'default': list,
1378 1378 b'example': [b'deadbeef'],
1379 1379 },
1380 1380 },
1381 1381 permission=b'pull',
1382 1382 )
1383 1383 def knownv2(repo, proto, nodes):
1384 1384 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1385 1385 yield result
1386 1386
1387 1387
1388 1388 @wireprotocommand(
1389 1389 b'listkeys',
1390 1390 args={
1391 1391 b'namespace': {
1392 1392 b'type': b'bytes',
1393 1393 b'example': b'ns',
1394 1394 },
1395 1395 },
1396 1396 permission=b'pull',
1397 1397 )
1398 1398 def listkeysv2(repo, proto, namespace):
1399 1399 keys = repo.listkeys(encoding.tolocal(namespace))
1400 1400 keys = {
1401 1401 encoding.fromlocal(k): encoding.fromlocal(v)
1402 1402 for k, v in pycompat.iteritems(keys)
1403 1403 }
1404 1404
1405 1405 yield keys
1406 1406
1407 1407
1408 1408 @wireprotocommand(
1409 1409 b'lookup',
1410 1410 args={
1411 1411 b'key': {
1412 1412 b'type': b'bytes',
1413 1413 b'example': b'foo',
1414 1414 },
1415 1415 },
1416 1416 permission=b'pull',
1417 1417 )
1418 1418 def lookupv2(repo, proto, key):
1419 1419 key = encoding.tolocal(key)
1420 1420
1421 1421 # TODO handle exception.
1422 1422 node = repo.lookup(key)
1423 1423
1424 1424 yield node
1425 1425
1426 1426
1427 1427 def manifestdatacapabilities(repo, proto):
1428 1428 batchsize = repo.ui.configint(
1429 1429 b'experimental', b'server.manifestdata.recommended-batch-size'
1430 1430 )
1431 1431
1432 1432 return {
1433 1433 b'recommendedbatchsize': batchsize,
1434 1434 }
1435 1435
1436 1436
1437 1437 @wireprotocommand(
1438 1438 b'manifestdata',
1439 1439 args={
1440 1440 b'nodes': {
1441 1441 b'type': b'list',
1442 1442 b'example': [b'0123456...'],
1443 1443 },
1444 1444 b'haveparents': {
1445 1445 b'type': b'bool',
1446 1446 b'default': lambda: False,
1447 1447 b'example': True,
1448 1448 },
1449 1449 b'fields': {
1450 1450 b'type': b'set',
1451 1451 b'default': set,
1452 1452 b'example': {b'parents', b'revision'},
1453 1453 b'validvalues': {b'parents', b'revision'},
1454 1454 },
1455 1455 b'tree': {
1456 1456 b'type': b'bytes',
1457 1457 b'example': b'',
1458 1458 },
1459 1459 },
1460 1460 permission=b'pull',
1461 1461 cachekeyfn=makecommandcachekeyfn(b'manifestdata', 1, allargs=True),
1462 1462 extracapabilitiesfn=manifestdatacapabilities,
1463 1463 )
1464 1464 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1465 1465 store = repo.manifestlog.getstorage(tree)
1466 1466
1467 1467 # Validate the node is known and abort on unknown revisions.
1468 1468 for node in nodes:
1469 1469 try:
1470 1470 store.rev(node)
1471 1471 except error.LookupError:
1472 1472 raise error.WireprotoCommandError(b'unknown node: %s', (node,))
1473 1473
1474 1474 revisions = store.emitrevisions(
1475 1475 nodes,
1476 1476 revisiondata=b'revision' in fields,
1477 1477 assumehaveparentrevisions=haveparents,
1478 1478 )
1479 1479
1480 1480 yield {
1481 1481 b'totalitems': len(nodes),
1482 1482 }
1483 1483
1484 1484 for revision in revisions:
1485 1485 d = {
1486 1486 b'node': revision.node,
1487 1487 }
1488 1488
1489 1489 if b'parents' in fields:
1490 1490 d[b'parents'] = [revision.p1node, revision.p2node]
1491 1491
1492 1492 followingmeta = []
1493 1493 followingdata = []
1494 1494
1495 1495 if b'revision' in fields:
1496 1496 if revision.revision is not None:
1497 1497 followingmeta.append((b'revision', len(revision.revision)))
1498 1498 followingdata.append(revision.revision)
1499 1499 else:
1500 1500 d[b'deltabasenode'] = revision.basenode
1501 1501 followingmeta.append((b'delta', len(revision.delta)))
1502 1502 followingdata.append(revision.delta)
1503 1503
1504 1504 if followingmeta:
1505 1505 d[b'fieldsfollowing'] = followingmeta
1506 1506
1507 1507 yield d
1508 1508
1509 1509 for extra in followingdata:
1510 1510 yield extra
1511 1511
1512 1512
1513 1513 @wireprotocommand(
1514 1514 b'pushkey',
1515 1515 args={
1516 1516 b'namespace': {
1517 1517 b'type': b'bytes',
1518 1518 b'example': b'ns',
1519 1519 },
1520 1520 b'key': {
1521 1521 b'type': b'bytes',
1522 1522 b'example': b'key',
1523 1523 },
1524 1524 b'old': {
1525 1525 b'type': b'bytes',
1526 1526 b'example': b'old',
1527 1527 },
1528 1528 b'new': {
1529 1529 b'type': b'bytes',
1530 1530 b'example': b'new',
1531 1531 },
1532 1532 },
1533 1533 permission=b'push',
1534 1534 )
1535 1535 def pushkeyv2(repo, proto, namespace, key, old, new):
1536 1536 # TODO handle ui output redirection
1537 1537 yield repo.pushkey(
1538 1538 encoding.tolocal(namespace),
1539 1539 encoding.tolocal(key),
1540 1540 encoding.tolocal(old),
1541 1541 encoding.tolocal(new),
1542 1542 )
1543 1543
1544 1544
1545 1545 @wireprotocommand(
1546 1546 b'rawstorefiledata',
1547 1547 args={
1548 1548 b'files': {
1549 1549 b'type': b'list',
1550 1550 b'example': [b'changelog', b'manifestlog'],
1551 1551 },
1552 1552 b'pathfilter': {
1553 1553 b'type': b'list',
1554 1554 b'default': lambda: None,
1555 1555 b'example': {b'include': [b'path:tests']},
1556 1556 },
1557 1557 },
1558 1558 permission=b'pull',
1559 1559 )
1560 1560 def rawstorefiledata(repo, proto, files, pathfilter):
1561 1561 if not streamclone.allowservergeneration(repo):
1562 1562 raise error.WireprotoCommandError(b'stream clone is disabled')
1563 1563
1564 1564 # TODO support dynamically advertising what store files "sets" are
1565 1565 # available. For now, we support changelog, manifestlog, and files.
1566 1566 files = set(files)
1567 1567 allowedfiles = {b'changelog', b'manifestlog'}
1568 1568
1569 1569 unsupported = files - allowedfiles
1570 1570 if unsupported:
1571 1571 raise error.WireprotoCommandError(
1572 1572 b'unknown file type: %s', (b', '.join(sorted(unsupported)),)
1573 1573 )
1574 1574
1575 1575 with repo.lock():
1576 1576 topfiles = list(repo.store.topfiles())
1577 1577
1578 1578 sendfiles = []
1579 1579 totalsize = 0
1580 1580
1581 1581 # TODO this is a bunch of storage layer interface abstractions because
1582 1582 # it assumes revlogs.
1583 1583 for name, encodedname, size in topfiles:
1584 1584 if b'changelog' in files and name.startswith(b'00changelog'):
1585 1585 pass
1586 1586 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1587 1587 pass
1588 1588 else:
1589 1589 continue
1590 1590
1591 1591 sendfiles.append((b'store', name, size))
1592 1592 totalsize += size
1593 1593
1594 1594 yield {
1595 1595 b'filecount': len(sendfiles),
1596 1596 b'totalsize': totalsize,
1597 1597 }
1598 1598
1599 1599 for location, name, size in sendfiles:
1600 1600 yield {
1601 1601 b'location': location,
1602 1602 b'path': name,
1603 1603 b'size': size,
1604 1604 }
1605 1605
1606 1606 # We have to use a closure for this to ensure the context manager is
1607 1607 # closed only after sending the final chunk.
1608 1608 def getfiledata():
1609 1609 with repo.svfs(name, b'rb', auditpath=False) as fh:
1610 1610 for chunk in util.filechunkiter(fh, limit=size):
1611 1611 yield chunk
1612 1612
1613 1613 yield wireprototypes.indefinitebytestringresponse(getfiledata())
General Comments 0
You need to be logged in to leave comments. Login now