##// END OF EJS Templates
wireprotov2server: let repo.narrowmatch(match) do matcher intersection...
Martin von Zweigbergk -
r40685:f83cea7f default
parent child Browse files
Show More
@@ -1,1460 +1,1460
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import collections
10 10 import contextlib
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18 from . import (
19 19 discovery,
20 20 encoding,
21 21 error,
22 22 match as matchmod,
23 23 narrowspec,
24 24 pycompat,
25 25 streamclone,
26 26 util,
27 27 wireprotoframing,
28 28 wireprototypes,
29 29 )
30 30 from .utils import (
31 31 cborutil,
32 32 interfaceutil,
33 33 stringutil,
34 34 )
35 35
36 36 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
37 37
38 38 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
39 39
40 40 COMMANDS = wireprototypes.commanddict()
41 41
42 42 # Value inserted into cache key computation function. Change the value to
43 43 # force new cache keys for every command request. This should be done when
44 44 # there is a change to how caching works, etc.
45 45 GLOBAL_CACHE_VERSION = 1
46 46
47 47 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
48 48 from .hgweb import common as hgwebcommon
49 49
50 50 # URL space looks like: <permissions>/<command>, where <permission> can
51 51 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
52 52
53 53 # Root URL does nothing meaningful... yet.
54 54 if not urlparts:
55 55 res.status = b'200 OK'
56 56 res.headers[b'Content-Type'] = b'text/plain'
57 57 res.setbodybytes(_('HTTP version 2 API handler'))
58 58 return
59 59
60 60 if len(urlparts) == 1:
61 61 res.status = b'404 Not Found'
62 62 res.headers[b'Content-Type'] = b'text/plain'
63 63 res.setbodybytes(_('do not know how to process %s\n') %
64 64 req.dispatchpath)
65 65 return
66 66
67 67 permission, command = urlparts[0:2]
68 68
69 69 if permission not in (b'ro', b'rw'):
70 70 res.status = b'404 Not Found'
71 71 res.headers[b'Content-Type'] = b'text/plain'
72 72 res.setbodybytes(_('unknown permission: %s') % permission)
73 73 return
74 74
75 75 if req.method != 'POST':
76 76 res.status = b'405 Method Not Allowed'
77 77 res.headers[b'Allow'] = b'POST'
78 78 res.setbodybytes(_('commands require POST requests'))
79 79 return
80 80
81 81 # At some point we'll want to use our own API instead of recycling the
82 82 # behavior of version 1 of the wire protocol...
83 83 # TODO return reasonable responses - not responses that overload the
84 84 # HTTP status line message for error reporting.
85 85 try:
86 86 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
87 87 except hgwebcommon.ErrorResponse as e:
88 88 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
89 89 for k, v in e.headers:
90 90 res.headers[k] = v
91 91 res.setbodybytes('permission denied')
92 92 return
93 93
94 94 # We have a special endpoint to reflect the request back at the client.
95 95 if command == b'debugreflect':
96 96 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
97 97 return
98 98
99 99 # Extra commands that we handle that aren't really wire protocol
100 100 # commands. Think extra hard before making this hackery available to
101 101 # extension.
102 102 extracommands = {'multirequest'}
103 103
104 104 if command not in COMMANDS and command not in extracommands:
105 105 res.status = b'404 Not Found'
106 106 res.headers[b'Content-Type'] = b'text/plain'
107 107 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
108 108 return
109 109
110 110 repo = rctx.repo
111 111 ui = repo.ui
112 112
113 113 proto = httpv2protocolhandler(req, ui)
114 114
115 115 if (not COMMANDS.commandavailable(command, proto)
116 116 and command not in extracommands):
117 117 res.status = b'404 Not Found'
118 118 res.headers[b'Content-Type'] = b'text/plain'
119 119 res.setbodybytes(_('invalid wire protocol command: %s') % command)
120 120 return
121 121
122 122 # TODO consider cases where proxies may add additional Accept headers.
123 123 if req.headers.get(b'Accept') != FRAMINGTYPE:
124 124 res.status = b'406 Not Acceptable'
125 125 res.headers[b'Content-Type'] = b'text/plain'
126 126 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
127 127 % FRAMINGTYPE)
128 128 return
129 129
130 130 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
131 131 res.status = b'415 Unsupported Media Type'
132 132 # TODO we should send a response with appropriate media type,
133 133 # since client does Accept it.
134 134 res.headers[b'Content-Type'] = b'text/plain'
135 135 res.setbodybytes(_('client MUST send Content-Type header with '
136 136 'value: %s\n') % FRAMINGTYPE)
137 137 return
138 138
139 139 _processhttpv2request(ui, repo, req, res, permission, command, proto)
140 140
141 141 def _processhttpv2reflectrequest(ui, repo, req, res):
142 142 """Reads unified frame protocol request and dumps out state to client.
143 143
144 144 This special endpoint can be used to help debug the wire protocol.
145 145
146 146 Instead of routing the request through the normal dispatch mechanism,
147 147 we instead read all frames, decode them, and feed them into our state
148 148 tracker. We then dump the log of all that activity back out to the
149 149 client.
150 150 """
151 151 import json
152 152
153 153 # Reflection APIs have a history of being abused, accidentally disclosing
154 154 # sensitive data, etc. So we have a config knob.
155 155 if not ui.configbool('experimental', 'web.api.debugreflect'):
156 156 res.status = b'404 Not Found'
157 157 res.headers[b'Content-Type'] = b'text/plain'
158 158 res.setbodybytes(_('debugreflect service not available'))
159 159 return
160 160
161 161 # We assume we have a unified framing protocol request body.
162 162
163 163 reactor = wireprotoframing.serverreactor(ui)
164 164 states = []
165 165
166 166 while True:
167 167 frame = wireprotoframing.readframe(req.bodyfh)
168 168
169 169 if not frame:
170 170 states.append(b'received: <no frame>')
171 171 break
172 172
173 173 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
174 174 frame.requestid,
175 175 frame.payload))
176 176
177 177 action, meta = reactor.onframerecv(frame)
178 178 states.append(json.dumps((action, meta), sort_keys=True,
179 179 separators=(', ', ': ')))
180 180
181 181 action, meta = reactor.oninputeof()
182 182 meta['action'] = action
183 183 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
184 184
185 185 res.status = b'200 OK'
186 186 res.headers[b'Content-Type'] = b'text/plain'
187 187 res.setbodybytes(b'\n'.join(states))
188 188
189 189 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
190 190 """Post-validation handler for HTTPv2 requests.
191 191
192 192 Called when the HTTP request contains unified frame-based protocol
193 193 frames for evaluation.
194 194 """
195 195 # TODO Some HTTP clients are full duplex and can receive data before
196 196 # the entire request is transmitted. Figure out a way to indicate support
197 197 # for that so we can opt into full duplex mode.
198 198 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
199 199 seencommand = False
200 200
201 201 outstream = None
202 202
203 203 while True:
204 204 frame = wireprotoframing.readframe(req.bodyfh)
205 205 if not frame:
206 206 break
207 207
208 208 action, meta = reactor.onframerecv(frame)
209 209
210 210 if action == 'wantframe':
211 211 # Need more data before we can do anything.
212 212 continue
213 213 elif action == 'runcommand':
214 214 # Defer creating output stream because we need to wait for
215 215 # protocol settings frames so proper encoding can be applied.
216 216 if not outstream:
217 217 outstream = reactor.makeoutputstream()
218 218
219 219 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
220 220 reqcommand, reactor, outstream,
221 221 meta, issubsequent=seencommand)
222 222
223 223 if sentoutput:
224 224 return
225 225
226 226 seencommand = True
227 227
228 228 elif action == 'error':
229 229 # TODO define proper error mechanism.
230 230 res.status = b'200 OK'
231 231 res.headers[b'Content-Type'] = b'text/plain'
232 232 res.setbodybytes(meta['message'] + b'\n')
233 233 return
234 234 else:
235 235 raise error.ProgrammingError(
236 236 'unhandled action from frame processor: %s' % action)
237 237
238 238 action, meta = reactor.oninputeof()
239 239 if action == 'sendframes':
240 240 # We assume we haven't started sending the response yet. If we're
241 241 # wrong, the response type will raise an exception.
242 242 res.status = b'200 OK'
243 243 res.headers[b'Content-Type'] = FRAMINGTYPE
244 244 res.setbodygen(meta['framegen'])
245 245 elif action == 'noop':
246 246 pass
247 247 else:
248 248 raise error.ProgrammingError('unhandled action from frame processor: %s'
249 249 % action)
250 250
251 251 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
252 252 outstream, command, issubsequent):
253 253 """Dispatch a wire protocol command made from HTTPv2 requests.
254 254
255 255 The authenticated permission (``authedperm``) along with the original
256 256 command from the URL (``reqcommand``) are passed in.
257 257 """
258 258 # We already validated that the session has permissions to perform the
259 259 # actions in ``authedperm``. In the unified frame protocol, the canonical
260 260 # command to run is expressed in a frame. However, the URL also requested
261 261 # to run a specific command. We need to be careful that the command we
262 262 # run doesn't have permissions requirements greater than what was granted
263 263 # by ``authedperm``.
264 264 #
265 265 # Our rule for this is we only allow one command per HTTP request and
266 266 # that command must match the command in the URL. However, we make
267 267 # an exception for the ``multirequest`` URL. This URL is allowed to
268 268 # execute multiple commands. We double check permissions of each command
269 269 # as it is invoked to ensure there is no privilege escalation.
270 270 # TODO consider allowing multiple commands to regular command URLs
271 271 # iff each command is the same.
272 272
273 273 proto = httpv2protocolhandler(req, ui, args=command['args'])
274 274
275 275 if reqcommand == b'multirequest':
276 276 if not COMMANDS.commandavailable(command['command'], proto):
277 277 # TODO proper error mechanism
278 278 res.status = b'200 OK'
279 279 res.headers[b'Content-Type'] = b'text/plain'
280 280 res.setbodybytes(_('wire protocol command not available: %s') %
281 281 command['command'])
282 282 return True
283 283
284 284 # TODO don't use assert here, since it may be elided by -O.
285 285 assert authedperm in (b'ro', b'rw')
286 286 wirecommand = COMMANDS[command['command']]
287 287 assert wirecommand.permission in ('push', 'pull')
288 288
289 289 if authedperm == b'ro' and wirecommand.permission != 'pull':
290 290 # TODO proper error mechanism
291 291 res.status = b'403 Forbidden'
292 292 res.headers[b'Content-Type'] = b'text/plain'
293 293 res.setbodybytes(_('insufficient permissions to execute '
294 294 'command: %s') % command['command'])
295 295 return True
296 296
297 297 # TODO should we also call checkperm() here? Maybe not if we're going
298 298 # to overhaul that API. The granted scope from the URL check should
299 299 # be good enough.
300 300
301 301 else:
302 302 # Don't allow multiple commands outside of ``multirequest`` URL.
303 303 if issubsequent:
304 304 # TODO proper error mechanism
305 305 res.status = b'200 OK'
306 306 res.headers[b'Content-Type'] = b'text/plain'
307 307 res.setbodybytes(_('multiple commands cannot be issued to this '
308 308 'URL'))
309 309 return True
310 310
311 311 if reqcommand != command['command']:
312 312 # TODO define proper error mechanism
313 313 res.status = b'200 OK'
314 314 res.headers[b'Content-Type'] = b'text/plain'
315 315 res.setbodybytes(_('command in frame must match command in URL'))
316 316 return True
317 317
318 318 res.status = b'200 OK'
319 319 res.headers[b'Content-Type'] = FRAMINGTYPE
320 320
321 321 try:
322 322 objs = dispatch(repo, proto, command['command'], command['redirect'])
323 323
324 324 action, meta = reactor.oncommandresponsereadyobjects(
325 325 outstream, command['requestid'], objs)
326 326
327 327 except error.WireprotoCommandError as e:
328 328 action, meta = reactor.oncommanderror(
329 329 outstream, command['requestid'], e.message, e.messageargs)
330 330
331 331 except Exception as e:
332 332 action, meta = reactor.onservererror(
333 333 outstream, command['requestid'],
334 334 _('exception when invoking command: %s') %
335 335 stringutil.forcebytestr(e))
336 336
337 337 if action == 'sendframes':
338 338 res.setbodygen(meta['framegen'])
339 339 return True
340 340 elif action == 'noop':
341 341 return False
342 342 else:
343 343 raise error.ProgrammingError('unhandled event from reactor: %s' %
344 344 action)
345 345
346 346 def getdispatchrepo(repo, proto, command):
347 347 return repo.filtered('served')
348 348
349 349 def dispatch(repo, proto, command, redirect):
350 350 """Run a wire protocol command.
351 351
352 352 Returns an iterable of objects that will be sent to the client.
353 353 """
354 354 repo = getdispatchrepo(repo, proto, command)
355 355
356 356 entry = COMMANDS[command]
357 357 func = entry.func
358 358 spec = entry.args
359 359
360 360 args = proto.getargs(spec)
361 361
362 362 # There is some duplicate boilerplate code here for calling the command and
363 363 # emitting objects. It is either that or a lot of indented code that looks
364 364 # like a pyramid (since there are a lot of code paths that result in not
365 365 # using the cacher).
366 366 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
367 367
368 368 # Request is not cacheable. Don't bother instantiating a cacher.
369 369 if not entry.cachekeyfn:
370 370 for o in callcommand():
371 371 yield o
372 372 return
373 373
374 374 if redirect:
375 375 redirecttargets = redirect[b'targets']
376 376 redirecthashes = redirect[b'hashes']
377 377 else:
378 378 redirecttargets = []
379 379 redirecthashes = []
380 380
381 381 cacher = makeresponsecacher(repo, proto, command, args,
382 382 cborutil.streamencode,
383 383 redirecttargets=redirecttargets,
384 384 redirecthashes=redirecthashes)
385 385
386 386 # But we have no cacher. Do default handling.
387 387 if not cacher:
388 388 for o in callcommand():
389 389 yield o
390 390 return
391 391
392 392 with cacher:
393 393 cachekey = entry.cachekeyfn(repo, proto, cacher, **args)
394 394
395 395 # No cache key or the cacher doesn't like it. Do default handling.
396 396 if cachekey is None or not cacher.setcachekey(cachekey):
397 397 for o in callcommand():
398 398 yield o
399 399 return
400 400
401 401 # Serve it from the cache, if possible.
402 402 cached = cacher.lookup()
403 403
404 404 if cached:
405 405 for o in cached['objs']:
406 406 yield o
407 407 return
408 408
409 409 # Else call the command and feed its output into the cacher, allowing
410 410 # the cacher to buffer/mutate objects as it desires.
411 411 for o in callcommand():
412 412 for o in cacher.onobject(o):
413 413 yield o
414 414
415 415 for o in cacher.onfinished():
416 416 yield o
417 417
418 418 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
419 419 class httpv2protocolhandler(object):
420 420 def __init__(self, req, ui, args=None):
421 421 self._req = req
422 422 self._ui = ui
423 423 self._args = args
424 424
425 425 @property
426 426 def name(self):
427 427 return HTTP_WIREPROTO_V2
428 428
429 429 def getargs(self, args):
430 430 # First look for args that were passed but aren't registered on this
431 431 # command.
432 432 extra = set(self._args) - set(args)
433 433 if extra:
434 434 raise error.WireprotoCommandError(
435 435 'unsupported argument to command: %s' %
436 436 ', '.join(sorted(extra)))
437 437
438 438 # And look for required arguments that are missing.
439 439 missing = {a for a in args if args[a]['required']} - set(self._args)
440 440
441 441 if missing:
442 442 raise error.WireprotoCommandError(
443 443 'missing required arguments: %s' % ', '.join(sorted(missing)))
444 444
445 445 # Now derive the arguments to pass to the command, taking into
446 446 # account the arguments specified by the client.
447 447 data = {}
448 448 for k, meta in sorted(args.items()):
449 449 # This argument wasn't passed by the client.
450 450 if k not in self._args:
451 451 data[k] = meta['default']()
452 452 continue
453 453
454 454 v = self._args[k]
455 455
456 456 # Sets may be expressed as lists. Silently normalize.
457 457 if meta['type'] == 'set' and isinstance(v, list):
458 458 v = set(v)
459 459
460 460 # TODO consider more/stronger type validation.
461 461
462 462 data[k] = v
463 463
464 464 return data
465 465
466 466 def getprotocaps(self):
467 467 # Protocol capabilities are currently not implemented for HTTP V2.
468 468 return set()
469 469
470 470 def getpayload(self):
471 471 raise NotImplementedError
472 472
473 473 @contextlib.contextmanager
474 474 def mayberedirectstdio(self):
475 475 raise NotImplementedError
476 476
477 477 def client(self):
478 478 raise NotImplementedError
479 479
480 480 def addcapabilities(self, repo, caps):
481 481 return caps
482 482
483 483 def checkperm(self, perm):
484 484 raise NotImplementedError
485 485
486 486 def httpv2apidescriptor(req, repo):
487 487 proto = httpv2protocolhandler(req, repo.ui)
488 488
489 489 return _capabilitiesv2(repo, proto)
490 490
491 491 def _capabilitiesv2(repo, proto):
492 492 """Obtain the set of capabilities for version 2 transports.
493 493
494 494 These capabilities are distinct from the capabilities for version 1
495 495 transports.
496 496 """
497 497 caps = {
498 498 'commands': {},
499 499 'framingmediatypes': [FRAMINGTYPE],
500 500 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
501 501 }
502 502
503 503 for command, entry in COMMANDS.items():
504 504 args = {}
505 505
506 506 for arg, meta in entry.args.items():
507 507 args[arg] = {
508 508 # TODO should this be a normalized type using CBOR's
509 509 # terminology?
510 510 b'type': meta['type'],
511 511 b'required': meta['required'],
512 512 }
513 513
514 514 if not meta['required']:
515 515 args[arg][b'default'] = meta['default']()
516 516
517 517 if meta['validvalues']:
518 518 args[arg][b'validvalues'] = meta['validvalues']
519 519
520 520 # TODO this type of check should be defined in a per-command callback.
521 521 if (command == b'rawstorefiledata'
522 522 and not streamclone.allowservergeneration(repo)):
523 523 continue
524 524
525 525 caps['commands'][command] = {
526 526 'args': args,
527 527 'permissions': [entry.permission],
528 528 }
529 529
530 530 if entry.extracapabilitiesfn:
531 531 extracaps = entry.extracapabilitiesfn(repo, proto)
532 532 caps['commands'][command].update(extracaps)
533 533
534 534 caps['rawrepoformats'] = sorted(repo.requirements &
535 535 repo.supportedformats)
536 536
537 537 targets = getadvertisedredirecttargets(repo, proto)
538 538 if targets:
539 539 caps[b'redirect'] = {
540 540 b'targets': [],
541 541 b'hashes': [b'sha256', b'sha1'],
542 542 }
543 543
544 544 for target in targets:
545 545 entry = {
546 546 b'name': target['name'],
547 547 b'protocol': target['protocol'],
548 548 b'uris': target['uris'],
549 549 }
550 550
551 551 for key in ('snirequired', 'tlsversions'):
552 552 if key in target:
553 553 entry[key] = target[key]
554 554
555 555 caps[b'redirect'][b'targets'].append(entry)
556 556
557 557 return proto.addcapabilities(repo, caps)
558 558
559 559 def getadvertisedredirecttargets(repo, proto):
560 560 """Obtain a list of content redirect targets.
561 561
562 562 Returns a list containing potential redirect targets that will be
563 563 advertised in capabilities data. Each dict MUST have the following
564 564 keys:
565 565
566 566 name
567 567 The name of this redirect target. This is the identifier clients use
568 568 to refer to a target. It is transferred as part of every command
569 569 request.
570 570
571 571 protocol
572 572 Network protocol used by this target. Typically this is the string
573 573 in front of the ``://`` in a URL. e.g. ``https``.
574 574
575 575 uris
576 576 List of representative URIs for this target. Clients can use the
577 577 URIs to test parsing for compatibility or for ordering preference
578 578 for which target to use.
579 579
580 580 The following optional keys are recognized:
581 581
582 582 snirequired
583 583 Bool indicating if Server Name Indication (SNI) is required to
584 584 connect to this target.
585 585
586 586 tlsversions
587 587 List of bytes indicating which TLS versions are supported by this
588 588 target.
589 589
590 590 By default, clients reflect the target order advertised by servers
591 591 and servers will use the first client-advertised target when picking
592 592 a redirect target. So targets should be advertised in the order the
593 593 server prefers they be used.
594 594 """
595 595 return []
596 596
597 597 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None,
598 598 extracapabilitiesfn=None):
599 599 """Decorator to declare a wire protocol command.
600 600
601 601 ``name`` is the name of the wire protocol command being provided.
602 602
603 603 ``args`` is a dict defining arguments accepted by the command. Keys are
604 604 the argument name. Values are dicts with the following keys:
605 605
606 606 ``type``
607 607 The argument data type. Must be one of the following string
608 608 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
609 609 or ``bool``.
610 610
611 611 ``default``
612 612 A callable returning the default value for this argument. If not
613 613 specified, ``None`` will be the default value.
614 614
615 615 ``example``
616 616 An example value for this argument.
617 617
618 618 ``validvalues``
619 619 Set of recognized values for this argument.
620 620
621 621 ``permission`` defines the permission type needed to run this command.
622 622 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
623 623 respectively. Default is to assume command requires ``push`` permissions
624 624 because otherwise commands not declaring their permissions could modify
625 625 a repository that is supposed to be read-only.
626 626
627 627 ``cachekeyfn`` defines an optional callable that can derive the
628 628 cache key for this request.
629 629
630 630 ``extracapabilitiesfn`` defines an optional callable that defines extra
631 631 command capabilities/parameters that are advertised next to the command
632 632 in the capabilities data structure describing the server. The callable
633 633 receives as arguments the repository and protocol objects. It returns
634 634 a dict of extra fields to add to the command descriptor.
635 635
636 636 Wire protocol commands are generators of objects to be serialized and
637 637 sent to the client.
638 638
639 639 If a command raises an uncaught exception, this will be translated into
640 640 a command error.
641 641
642 642 All commands can opt in to being cacheable by defining a function
643 643 (``cachekeyfn``) that is called to derive a cache key. This function
644 644 receives the same arguments as the command itself plus a ``cacher``
645 645 argument containing the active cacher for the request and returns a bytes
646 646 containing the key in a cache the response to this command may be cached
647 647 under.
648 648 """
649 649 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
650 650 if v['version'] == 2}
651 651
652 652 if permission not in ('push', 'pull'):
653 653 raise error.ProgrammingError('invalid wire protocol permission; '
654 654 'got %s; expected "push" or "pull"' %
655 655 permission)
656 656
657 657 if args is None:
658 658 args = {}
659 659
660 660 if not isinstance(args, dict):
661 661 raise error.ProgrammingError('arguments for version 2 commands '
662 662 'must be declared as dicts')
663 663
664 664 for arg, meta in args.items():
665 665 if arg == '*':
666 666 raise error.ProgrammingError('* argument name not allowed on '
667 667 'version 2 commands')
668 668
669 669 if not isinstance(meta, dict):
670 670 raise error.ProgrammingError('arguments for version 2 commands '
671 671 'must declare metadata as a dict')
672 672
673 673 if 'type' not in meta:
674 674 raise error.ProgrammingError('%s argument for command %s does not '
675 675 'declare type field' % (arg, name))
676 676
677 677 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
678 678 raise error.ProgrammingError('%s argument for command %s has '
679 679 'illegal type: %s' % (arg, name,
680 680 meta['type']))
681 681
682 682 if 'example' not in meta:
683 683 raise error.ProgrammingError('%s argument for command %s does not '
684 684 'declare example field' % (arg, name))
685 685
686 686 meta['required'] = 'default' not in meta
687 687
688 688 meta.setdefault('default', lambda: None)
689 689 meta.setdefault('validvalues', None)
690 690
691 691 def register(func):
692 692 if name in COMMANDS:
693 693 raise error.ProgrammingError('%s command already registered '
694 694 'for version 2' % name)
695 695
696 696 COMMANDS[name] = wireprototypes.commandentry(
697 697 func, args=args, transports=transports, permission=permission,
698 698 cachekeyfn=cachekeyfn, extracapabilitiesfn=extracapabilitiesfn)
699 699
700 700 return func
701 701
702 702 return register
703 703
704 704 def makecommandcachekeyfn(command, localversion=None, allargs=False):
705 705 """Construct a cache key derivation function with common features.
706 706
707 707 By default, the cache key is a hash of:
708 708
709 709 * The command name.
710 710 * A global cache version number.
711 711 * A local cache version number (passed via ``localversion``).
712 712 * All the arguments passed to the command.
713 713 * The media type used.
714 714 * Wire protocol version string.
715 715 * The repository path.
716 716 """
717 717 if not allargs:
718 718 raise error.ProgrammingError('only allargs=True is currently supported')
719 719
720 720 if localversion is None:
721 721 raise error.ProgrammingError('must set localversion argument value')
722 722
723 723 def cachekeyfn(repo, proto, cacher, **args):
724 724 spec = COMMANDS[command]
725 725
726 726 # Commands that mutate the repo can not be cached.
727 727 if spec.permission == 'push':
728 728 return None
729 729
730 730 # TODO config option to disable caching.
731 731
732 732 # Our key derivation strategy is to construct a data structure
733 733 # holding everything that could influence cacheability and to hash
734 734 # the CBOR representation of that. Using CBOR seems like it might
735 735 # be overkill. However, simpler hashing mechanisms are prone to
736 736 # duplicate input issues. e.g. if you just concatenate two values,
737 737 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
738 738 # "padding" between values and prevents these problems.
739 739
740 740 # Seed the hash with various data.
741 741 state = {
742 742 # To invalidate all cache keys.
743 743 b'globalversion': GLOBAL_CACHE_VERSION,
744 744 # More granular cache key invalidation.
745 745 b'localversion': localversion,
746 746 # Cache keys are segmented by command.
747 747 b'command': pycompat.sysbytes(command),
748 748 # Throw in the media type and API version strings so changes
749 749 # to exchange semantics invalid cache.
750 750 b'mediatype': FRAMINGTYPE,
751 751 b'version': HTTP_WIREPROTO_V2,
752 752 # So same requests for different repos don't share cache keys.
753 753 b'repo': repo.root,
754 754 }
755 755
756 756 # The arguments passed to us will have already been normalized.
757 757 # Default values will be set, etc. This is important because it
758 758 # means that it doesn't matter if clients send an explicit argument
759 759 # or rely on the default value: it will all normalize to the same
760 760 # set of arguments on the server and therefore the same cache key.
761 761 #
762 762 # Arguments by their very nature must support being encoded to CBOR.
763 763 # And the CBOR encoder is deterministic. So we hash the arguments
764 764 # by feeding the CBOR of their representation into the hasher.
765 765 if allargs:
766 766 state[b'args'] = pycompat.byteskwargs(args)
767 767
768 768 cacher.adjustcachekeystate(state)
769 769
770 770 hasher = hashlib.sha1()
771 771 for chunk in cborutil.streamencode(state):
772 772 hasher.update(chunk)
773 773
774 774 return pycompat.sysbytes(hasher.hexdigest())
775 775
776 776 return cachekeyfn
777 777
778 778 def makeresponsecacher(repo, proto, command, args, objencoderfn,
779 779 redirecttargets, redirecthashes):
780 780 """Construct a cacher for a cacheable command.
781 781
782 782 Returns an ``iwireprotocolcommandcacher`` instance.
783 783
784 784 Extensions can monkeypatch this function to provide custom caching
785 785 backends.
786 786 """
787 787 return None
788 788
789 789 def resolvenodes(repo, revisions):
790 790 """Resolve nodes from a revisions specifier data structure."""
791 791 cl = repo.changelog
792 792 clhasnode = cl.hasnode
793 793
794 794 seen = set()
795 795 nodes = []
796 796
797 797 if not isinstance(revisions, list):
798 798 raise error.WireprotoCommandError('revisions must be defined as an '
799 799 'array')
800 800
801 801 for spec in revisions:
802 802 if b'type' not in spec:
803 803 raise error.WireprotoCommandError(
804 804 'type key not present in revision specifier')
805 805
806 806 typ = spec[b'type']
807 807
808 808 if typ == b'changesetexplicit':
809 809 if b'nodes' not in spec:
810 810 raise error.WireprotoCommandError(
811 811 'nodes key not present in changesetexplicit revision '
812 812 'specifier')
813 813
814 814 for node in spec[b'nodes']:
815 815 if node not in seen:
816 816 nodes.append(node)
817 817 seen.add(node)
818 818
819 819 elif typ == b'changesetexplicitdepth':
820 820 for key in (b'nodes', b'depth'):
821 821 if key not in spec:
822 822 raise error.WireprotoCommandError(
823 823 '%s key not present in changesetexplicitdepth revision '
824 824 'specifier', (key,))
825 825
826 826 for rev in repo.revs(b'ancestors(%ln, %d)', spec[b'nodes'],
827 827 spec[b'depth'] - 1):
828 828 node = cl.node(rev)
829 829
830 830 if node not in seen:
831 831 nodes.append(node)
832 832 seen.add(node)
833 833
834 834 elif typ == b'changesetdagrange':
835 835 for key in (b'roots', b'heads'):
836 836 if key not in spec:
837 837 raise error.WireprotoCommandError(
838 838 '%s key not present in changesetdagrange revision '
839 839 'specifier', (key,))
840 840
841 841 if not spec[b'heads']:
842 842 raise error.WireprotoCommandError(
843 843 'heads key in changesetdagrange cannot be empty')
844 844
845 845 if spec[b'roots']:
846 846 common = [n for n in spec[b'roots'] if clhasnode(n)]
847 847 else:
848 848 common = [nullid]
849 849
850 850 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
851 851 if n not in seen:
852 852 nodes.append(n)
853 853 seen.add(n)
854 854
855 855 else:
856 856 raise error.WireprotoCommandError(
857 857 'unknown revision specifier type: %s', (typ,))
858 858
859 859 return nodes
860 860
861 861 @wireprotocommand('branchmap', permission='pull')
862 862 def branchmapv2(repo, proto):
863 863 yield {encoding.fromlocal(k): v
864 864 for k, v in repo.branchmap().iteritems()}
865 865
866 866 @wireprotocommand('capabilities', permission='pull')
867 867 def capabilitiesv2(repo, proto):
868 868 yield _capabilitiesv2(repo, proto)
869 869
870 870 @wireprotocommand(
871 871 'changesetdata',
872 872 args={
873 873 'revisions': {
874 874 'type': 'list',
875 875 'example': [{
876 876 b'type': b'changesetexplicit',
877 877 b'nodes': [b'abcdef...'],
878 878 }],
879 879 },
880 880 'fields': {
881 881 'type': 'set',
882 882 'default': set,
883 883 'example': {b'parents', b'revision'},
884 884 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
885 885 },
886 886 },
887 887 permission='pull')
888 888 def changesetdata(repo, proto, revisions, fields):
889 889 # TODO look for unknown fields and abort when they can't be serviced.
890 890 # This could probably be validated by dispatcher using validvalues.
891 891
892 892 cl = repo.changelog
893 893 outgoing = resolvenodes(repo, revisions)
894 894 publishing = repo.publishing()
895 895
896 896 if outgoing:
897 897 repo.hook('preoutgoing', throw=True, source='serve')
898 898
899 899 yield {
900 900 b'totalitems': len(outgoing),
901 901 }
902 902
903 903 # The phases of nodes already transferred to the client may have changed
904 904 # since the client last requested data. We send phase-only records
905 905 # for these revisions, if requested.
906 906 # TODO actually do this. We'll probably want to emit phase heads
907 907 # in the ancestry set of the outgoing revisions. This will ensure
908 908 # that phase updates within that set are seen.
909 909 if b'phase' in fields:
910 910 pass
911 911
912 912 nodebookmarks = {}
913 913 for mark, node in repo._bookmarks.items():
914 914 nodebookmarks.setdefault(node, set()).add(mark)
915 915
916 916 # It is already topologically sorted by revision number.
917 917 for node in outgoing:
918 918 d = {
919 919 b'node': node,
920 920 }
921 921
922 922 if b'parents' in fields:
923 923 d[b'parents'] = cl.parents(node)
924 924
925 925 if b'phase' in fields:
926 926 if publishing:
927 927 d[b'phase'] = b'public'
928 928 else:
929 929 ctx = repo[node]
930 930 d[b'phase'] = ctx.phasestr()
931 931
932 932 if b'bookmarks' in fields and node in nodebookmarks:
933 933 d[b'bookmarks'] = sorted(nodebookmarks[node])
934 934 del nodebookmarks[node]
935 935
936 936 followingmeta = []
937 937 followingdata = []
938 938
939 939 if b'revision' in fields:
940 940 revisiondata = cl.revision(node, raw=True)
941 941 followingmeta.append((b'revision', len(revisiondata)))
942 942 followingdata.append(revisiondata)
943 943
944 944 # TODO make it possible for extensions to wrap a function or register
945 945 # a handler to service custom fields.
946 946
947 947 if followingmeta:
948 948 d[b'fieldsfollowing'] = followingmeta
949 949
950 950 yield d
951 951
952 952 for extra in followingdata:
953 953 yield extra
954 954
955 955 # If requested, send bookmarks from nodes that didn't have revision
956 956 # data sent so receiver is aware of any bookmark updates.
957 957 if b'bookmarks' in fields:
958 958 for node, marks in sorted(nodebookmarks.iteritems()):
959 959 yield {
960 960 b'node': node,
961 961 b'bookmarks': sorted(marks),
962 962 }
963 963
964 964 class FileAccessError(Exception):
965 965 """Represents an error accessing a specific file."""
966 966
967 967 def __init__(self, path, msg, args):
968 968 self.path = path
969 969 self.msg = msg
970 970 self.args = args
971 971
972 972 def getfilestore(repo, proto, path):
973 973 """Obtain a file storage object for use with wire protocol.
974 974
975 975 Exists as a standalone function so extensions can monkeypatch to add
976 976 access control.
977 977 """
978 978 # This seems to work even if the file doesn't exist. So catch
979 979 # "empty" files and return an error.
980 980 fl = repo.file(path)
981 981
982 982 if not len(fl):
983 983 raise FileAccessError(path, 'unknown file: %s', (path,))
984 984
985 985 return fl
986 986
987 987 def emitfilerevisions(repo, path, revisions, fields):
988 988 clnode = repo.changelog.node
989 989
990 990 for revision in revisions:
991 991 d = {
992 992 b'node': revision.node,
993 993 }
994 994
995 995 if b'parents' in fields:
996 996 d[b'parents'] = [revision.p1node, revision.p2node]
997 997
998 998 if b'linknode' in fields:
999 999 # TODO by creating the filectx against a specific file revision
1000 1000 # instead of changeset, linkrev() is always used. This is wrong for
1001 1001 # cases where linkrev() may refer to a hidden changeset. We need an
1002 1002 # API for performing linkrev adjustment that takes this into
1003 1003 # account.
1004 1004 fctx = repo.filectx(path, fileid=revision.node)
1005 1005 d[b'linknode'] = clnode(fctx.introrev())
1006 1006
1007 1007 followingmeta = []
1008 1008 followingdata = []
1009 1009
1010 1010 if b'revision' in fields:
1011 1011 if revision.revision is not None:
1012 1012 followingmeta.append((b'revision', len(revision.revision)))
1013 1013 followingdata.append(revision.revision)
1014 1014 else:
1015 1015 d[b'deltabasenode'] = revision.basenode
1016 1016 followingmeta.append((b'delta', len(revision.delta)))
1017 1017 followingdata.append(revision.delta)
1018 1018
1019 1019 if followingmeta:
1020 1020 d[b'fieldsfollowing'] = followingmeta
1021 1021
1022 1022 yield d
1023 1023
1024 1024 for extra in followingdata:
1025 1025 yield extra
1026 1026
1027 1027 def makefilematcher(repo, pathfilter):
1028 1028 """Construct a matcher from a path filter dict."""
1029 1029
1030 1030 # Validate values.
1031 1031 if pathfilter:
1032 1032 for key in (b'include', b'exclude'):
1033 1033 for pattern in pathfilter.get(key, []):
1034 1034 if not pattern.startswith((b'path:', b'rootfilesin:')):
1035 1035 raise error.WireprotoCommandError(
1036 1036 '%s pattern must begin with `path:` or `rootfilesin:`; '
1037 1037 'got %s', (key, pattern))
1038 1038
1039 1039 if pathfilter:
1040 1040 matcher = matchmod.match(repo.root, b'',
1041 1041 include=pathfilter.get(b'include', []),
1042 1042 exclude=pathfilter.get(b'exclude', []))
1043 1043 else:
1044 1044 matcher = matchmod.match(repo.root, b'')
1045 1045
1046 1046 # Requested patterns could include files not in the local store. So
1047 1047 # filter those out.
1048 return matchmod.intersectmatchers(repo.narrowmatch(), matcher)
1048 return repo.narrowmatch(matcher)
1049 1049
1050 1050 @wireprotocommand(
1051 1051 'filedata',
1052 1052 args={
1053 1053 'haveparents': {
1054 1054 'type': 'bool',
1055 1055 'default': lambda: False,
1056 1056 'example': True,
1057 1057 },
1058 1058 'nodes': {
1059 1059 'type': 'list',
1060 1060 'example': [b'0123456...'],
1061 1061 },
1062 1062 'fields': {
1063 1063 'type': 'set',
1064 1064 'default': set,
1065 1065 'example': {b'parents', b'revision'},
1066 1066 'validvalues': {b'parents', b'revision', b'linknode'},
1067 1067 },
1068 1068 'path': {
1069 1069 'type': 'bytes',
1070 1070 'example': b'foo.txt',
1071 1071 }
1072 1072 },
1073 1073 permission='pull',
1074 1074 # TODO censoring a file revision won't invalidate the cache.
1075 1075 # Figure out a way to take censoring into account when deriving
1076 1076 # the cache key.
1077 1077 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
1078 1078 def filedata(repo, proto, haveparents, nodes, fields, path):
1079 1079 # TODO this API allows access to file revisions that are attached to
1080 1080 # secret changesets. filesdata does not have this problem. Maybe this
1081 1081 # API should be deleted?
1082 1082
1083 1083 try:
1084 1084 # Extensions may wish to access the protocol handler.
1085 1085 store = getfilestore(repo, proto, path)
1086 1086 except FileAccessError as e:
1087 1087 raise error.WireprotoCommandError(e.msg, e.args)
1088 1088
1089 1089 # Validate requested nodes.
1090 1090 for node in nodes:
1091 1091 try:
1092 1092 store.rev(node)
1093 1093 except error.LookupError:
1094 1094 raise error.WireprotoCommandError('unknown file node: %s',
1095 1095 (hex(node),))
1096 1096
1097 1097 revisions = store.emitrevisions(nodes,
1098 1098 revisiondata=b'revision' in fields,
1099 1099 assumehaveparentrevisions=haveparents)
1100 1100
1101 1101 yield {
1102 1102 b'totalitems': len(nodes),
1103 1103 }
1104 1104
1105 1105 for o in emitfilerevisions(repo, path, revisions, fields):
1106 1106 yield o
1107 1107
1108 1108 def filesdatacapabilities(repo, proto):
1109 1109 batchsize = repo.ui.configint(
1110 1110 b'experimental', b'server.filesdata.recommended-batch-size')
1111 1111 return {
1112 1112 b'recommendedbatchsize': batchsize,
1113 1113 }
1114 1114
1115 1115 @wireprotocommand(
1116 1116 'filesdata',
1117 1117 args={
1118 1118 'haveparents': {
1119 1119 'type': 'bool',
1120 1120 'default': lambda: False,
1121 1121 'example': True,
1122 1122 },
1123 1123 'fields': {
1124 1124 'type': 'set',
1125 1125 'default': set,
1126 1126 'example': {b'parents', b'revision'},
1127 1127 'validvalues': {b'firstchangeset', b'linknode', b'parents',
1128 1128 b'revision'},
1129 1129 },
1130 1130 'pathfilter': {
1131 1131 'type': 'dict',
1132 1132 'default': lambda: None,
1133 1133 'example': {b'include': [b'path:tests']},
1134 1134 },
1135 1135 'revisions': {
1136 1136 'type': 'list',
1137 1137 'example': [{
1138 1138 b'type': b'changesetexplicit',
1139 1139 b'nodes': [b'abcdef...'],
1140 1140 }],
1141 1141 },
1142 1142 },
1143 1143 permission='pull',
1144 1144 # TODO censoring a file revision won't invalidate the cache.
1145 1145 # Figure out a way to take censoring into account when deriving
1146 1146 # the cache key.
1147 1147 cachekeyfn=makecommandcachekeyfn('filesdata', 1, allargs=True),
1148 1148 extracapabilitiesfn=filesdatacapabilities)
1149 1149 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1150 1150 # TODO This should operate on a repo that exposes obsolete changesets. There
1151 1151 # is a race between a client making a push that obsoletes a changeset and
1152 1152 # another client fetching files data for that changeset. If a client has a
1153 1153 # changeset, it should probably be allowed to access files data for that
1154 1154 # changeset.
1155 1155
1156 1156 cl = repo.changelog
1157 1157 outgoing = resolvenodes(repo, revisions)
1158 1158 filematcher = makefilematcher(repo, pathfilter)
1159 1159
1160 1160 # Figure out what needs to be emitted.
1161 1161 changedpaths = set()
1162 1162 fnodes = collections.defaultdict(set)
1163 1163
1164 1164 for node in outgoing:
1165 1165 ctx = repo[node]
1166 1166 changedpaths.update(ctx.files())
1167 1167
1168 1168 changedpaths = sorted(p for p in changedpaths if filematcher(p))
1169 1169
1170 1170 # If ancestors are known, we send file revisions having a linkrev in the
1171 1171 # outgoing set of changeset revisions.
1172 1172 if haveparents:
1173 1173 outgoingclrevs = set(cl.rev(n) for n in outgoing)
1174 1174
1175 1175 for path in changedpaths:
1176 1176 try:
1177 1177 store = getfilestore(repo, proto, path)
1178 1178 except FileAccessError as e:
1179 1179 raise error.WireprotoCommandError(e.msg, e.args)
1180 1180
1181 1181 for rev in store:
1182 1182 linkrev = store.linkrev(rev)
1183 1183
1184 1184 if linkrev in outgoingclrevs:
1185 1185 fnodes[path].add(store.node(rev))
1186 1186
1187 1187 # If ancestors aren't known, we walk the manifests and send all
1188 1188 # encountered file revisions.
1189 1189 else:
1190 1190 for node in outgoing:
1191 1191 mctx = repo[node].manifestctx()
1192 1192
1193 1193 for path, fnode in mctx.read().items():
1194 1194 if filematcher(path):
1195 1195 fnodes[path].add(fnode)
1196 1196
1197 1197 yield {
1198 1198 b'totalpaths': len(fnodes),
1199 1199 b'totalitems': sum(len(v) for v in fnodes.values())
1200 1200 }
1201 1201
1202 1202 for path, filenodes in sorted(fnodes.items()):
1203 1203 try:
1204 1204 store = getfilestore(repo, proto, path)
1205 1205 except FileAccessError as e:
1206 1206 raise error.WireprotoCommandError(e.msg, e.args)
1207 1207
1208 1208 yield {
1209 1209 b'path': path,
1210 1210 b'totalitems': len(filenodes),
1211 1211 }
1212 1212
1213 1213 revisions = store.emitrevisions(filenodes,
1214 1214 revisiondata=b'revision' in fields,
1215 1215 assumehaveparentrevisions=haveparents)
1216 1216
1217 1217 for o in emitfilerevisions(repo, path, revisions, fields):
1218 1218 yield o
1219 1219
1220 1220 @wireprotocommand(
1221 1221 'heads',
1222 1222 args={
1223 1223 'publiconly': {
1224 1224 'type': 'bool',
1225 1225 'default': lambda: False,
1226 1226 'example': False,
1227 1227 },
1228 1228 },
1229 1229 permission='pull')
1230 1230 def headsv2(repo, proto, publiconly):
1231 1231 if publiconly:
1232 1232 repo = repo.filtered('immutable')
1233 1233
1234 1234 yield repo.heads()
1235 1235
1236 1236 @wireprotocommand(
1237 1237 'known',
1238 1238 args={
1239 1239 'nodes': {
1240 1240 'type': 'list',
1241 1241 'default': list,
1242 1242 'example': [b'deadbeef'],
1243 1243 },
1244 1244 },
1245 1245 permission='pull')
1246 1246 def knownv2(repo, proto, nodes):
1247 1247 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1248 1248 yield result
1249 1249
1250 1250 @wireprotocommand(
1251 1251 'listkeys',
1252 1252 args={
1253 1253 'namespace': {
1254 1254 'type': 'bytes',
1255 1255 'example': b'ns',
1256 1256 },
1257 1257 },
1258 1258 permission='pull')
1259 1259 def listkeysv2(repo, proto, namespace):
1260 1260 keys = repo.listkeys(encoding.tolocal(namespace))
1261 1261 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1262 1262 for k, v in keys.iteritems()}
1263 1263
1264 1264 yield keys
1265 1265
1266 1266 @wireprotocommand(
1267 1267 'lookup',
1268 1268 args={
1269 1269 'key': {
1270 1270 'type': 'bytes',
1271 1271 'example': b'foo',
1272 1272 },
1273 1273 },
1274 1274 permission='pull')
1275 1275 def lookupv2(repo, proto, key):
1276 1276 key = encoding.tolocal(key)
1277 1277
1278 1278 # TODO handle exception.
1279 1279 node = repo.lookup(key)
1280 1280
1281 1281 yield node
1282 1282
1283 1283 def manifestdatacapabilities(repo, proto):
1284 1284 batchsize = repo.ui.configint(
1285 1285 b'experimental', b'server.manifestdata.recommended-batch-size')
1286 1286
1287 1287 return {
1288 1288 b'recommendedbatchsize': batchsize,
1289 1289 }
1290 1290
1291 1291 @wireprotocommand(
1292 1292 'manifestdata',
1293 1293 args={
1294 1294 'nodes': {
1295 1295 'type': 'list',
1296 1296 'example': [b'0123456...'],
1297 1297 },
1298 1298 'haveparents': {
1299 1299 'type': 'bool',
1300 1300 'default': lambda: False,
1301 1301 'example': True,
1302 1302 },
1303 1303 'fields': {
1304 1304 'type': 'set',
1305 1305 'default': set,
1306 1306 'example': {b'parents', b'revision'},
1307 1307 'validvalues': {b'parents', b'revision'},
1308 1308 },
1309 1309 'tree': {
1310 1310 'type': 'bytes',
1311 1311 'example': b'',
1312 1312 },
1313 1313 },
1314 1314 permission='pull',
1315 1315 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True),
1316 1316 extracapabilitiesfn=manifestdatacapabilities)
1317 1317 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1318 1318 store = repo.manifestlog.getstorage(tree)
1319 1319
1320 1320 # Validate the node is known and abort on unknown revisions.
1321 1321 for node in nodes:
1322 1322 try:
1323 1323 store.rev(node)
1324 1324 except error.LookupError:
1325 1325 raise error.WireprotoCommandError(
1326 1326 'unknown node: %s', (node,))
1327 1327
1328 1328 revisions = store.emitrevisions(nodes,
1329 1329 revisiondata=b'revision' in fields,
1330 1330 assumehaveparentrevisions=haveparents)
1331 1331
1332 1332 yield {
1333 1333 b'totalitems': len(nodes),
1334 1334 }
1335 1335
1336 1336 for revision in revisions:
1337 1337 d = {
1338 1338 b'node': revision.node,
1339 1339 }
1340 1340
1341 1341 if b'parents' in fields:
1342 1342 d[b'parents'] = [revision.p1node, revision.p2node]
1343 1343
1344 1344 followingmeta = []
1345 1345 followingdata = []
1346 1346
1347 1347 if b'revision' in fields:
1348 1348 if revision.revision is not None:
1349 1349 followingmeta.append((b'revision', len(revision.revision)))
1350 1350 followingdata.append(revision.revision)
1351 1351 else:
1352 1352 d[b'deltabasenode'] = revision.basenode
1353 1353 followingmeta.append((b'delta', len(revision.delta)))
1354 1354 followingdata.append(revision.delta)
1355 1355
1356 1356 if followingmeta:
1357 1357 d[b'fieldsfollowing'] = followingmeta
1358 1358
1359 1359 yield d
1360 1360
1361 1361 for extra in followingdata:
1362 1362 yield extra
1363 1363
1364 1364 @wireprotocommand(
1365 1365 'pushkey',
1366 1366 args={
1367 1367 'namespace': {
1368 1368 'type': 'bytes',
1369 1369 'example': b'ns',
1370 1370 },
1371 1371 'key': {
1372 1372 'type': 'bytes',
1373 1373 'example': b'key',
1374 1374 },
1375 1375 'old': {
1376 1376 'type': 'bytes',
1377 1377 'example': b'old',
1378 1378 },
1379 1379 'new': {
1380 1380 'type': 'bytes',
1381 1381 'example': 'new',
1382 1382 },
1383 1383 },
1384 1384 permission='push')
1385 1385 def pushkeyv2(repo, proto, namespace, key, old, new):
1386 1386 # TODO handle ui output redirection
1387 1387 yield repo.pushkey(encoding.tolocal(namespace),
1388 1388 encoding.tolocal(key),
1389 1389 encoding.tolocal(old),
1390 1390 encoding.tolocal(new))
1391 1391
1392 1392
1393 1393 @wireprotocommand(
1394 1394 'rawstorefiledata',
1395 1395 args={
1396 1396 'files': {
1397 1397 'type': 'list',
1398 1398 'example': [b'changelog', b'manifestlog'],
1399 1399 },
1400 1400 'pathfilter': {
1401 1401 'type': 'list',
1402 1402 'default': lambda: None,
1403 1403 'example': {b'include': [b'path:tests']},
1404 1404 },
1405 1405 },
1406 1406 permission='pull')
1407 1407 def rawstorefiledata(repo, proto, files, pathfilter):
1408 1408 if not streamclone.allowservergeneration(repo):
1409 1409 raise error.WireprotoCommandError(b'stream clone is disabled')
1410 1410
1411 1411 # TODO support dynamically advertising what store files "sets" are
1412 1412 # available. For now, we support changelog, manifestlog, and files.
1413 1413 files = set(files)
1414 1414 allowedfiles = {b'changelog', b'manifestlog'}
1415 1415
1416 1416 unsupported = files - allowedfiles
1417 1417 if unsupported:
1418 1418 raise error.WireprotoCommandError(b'unknown file type: %s',
1419 1419 (b', '.join(sorted(unsupported)),))
1420 1420
1421 1421 with repo.lock():
1422 1422 topfiles = list(repo.store.topfiles())
1423 1423
1424 1424 sendfiles = []
1425 1425 totalsize = 0
1426 1426
1427 1427 # TODO this is a bunch of storage layer interface abstractions because
1428 1428 # it assumes revlogs.
1429 1429 for name, encodedname, size in topfiles:
1430 1430 if b'changelog' in files and name.startswith(b'00changelog'):
1431 1431 pass
1432 1432 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1433 1433 pass
1434 1434 else:
1435 1435 continue
1436 1436
1437 1437 sendfiles.append((b'store', name, size))
1438 1438 totalsize += size
1439 1439
1440 1440 yield {
1441 1441 b'filecount': len(sendfiles),
1442 1442 b'totalsize': totalsize,
1443 1443 }
1444 1444
1445 1445 for location, name, size in sendfiles:
1446 1446 yield {
1447 1447 b'location': location,
1448 1448 b'path': name,
1449 1449 b'size': size,
1450 1450 }
1451 1451
1452 1452 # We have to use a closure for this to ensure the context manager is
1453 1453 # closed only after sending the final chunk.
1454 1454 def getfiledata():
1455 1455 with repo.svfs(name, 'rb', auditpath=False) as fh:
1456 1456 for chunk in util.filechunkiter(fh, limit=size):
1457 1457 yield chunk
1458 1458
1459 1459 yield wireprototypes.indefinitebytestringresponse(
1460 1460 getfiledata())
General Comments 0
You need to be logged in to leave comments. Login now