##// END OF EJS Templates
wireprotov2server: use pycompat.strkwargs when calling cachekeyfn...
Gregory Szorc -
r41421:56fcbac6 default
parent child Browse files
Show More
@@ -1,1455 +1,1456 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import collections
10 10 import contextlib
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18 from . import (
19 19 discovery,
20 20 encoding,
21 21 error,
22 22 match as matchmod,
23 23 narrowspec,
24 24 pycompat,
25 25 streamclone,
26 26 util,
27 27 wireprotoframing,
28 28 wireprototypes,
29 29 )
30 30 from .utils import (
31 31 cborutil,
32 32 interfaceutil,
33 33 stringutil,
34 34 )
35 35
36 36 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
37 37
38 38 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
39 39
40 40 COMMANDS = wireprototypes.commanddict()
41 41
42 42 # Value inserted into cache key computation function. Change the value to
43 43 # force new cache keys for every command request. This should be done when
44 44 # there is a change to how caching works, etc.
45 45 GLOBAL_CACHE_VERSION = 1
46 46
47 47 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
48 48 from .hgweb import common as hgwebcommon
49 49
50 50 # URL space looks like: <permissions>/<command>, where <permission> can
51 51 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
52 52
53 53 # Root URL does nothing meaningful... yet.
54 54 if not urlparts:
55 55 res.status = b'200 OK'
56 56 res.headers[b'Content-Type'] = b'text/plain'
57 57 res.setbodybytes(_('HTTP version 2 API handler'))
58 58 return
59 59
60 60 if len(urlparts) == 1:
61 61 res.status = b'404 Not Found'
62 62 res.headers[b'Content-Type'] = b'text/plain'
63 63 res.setbodybytes(_('do not know how to process %s\n') %
64 64 req.dispatchpath)
65 65 return
66 66
67 67 permission, command = urlparts[0:2]
68 68
69 69 if permission not in (b'ro', b'rw'):
70 70 res.status = b'404 Not Found'
71 71 res.headers[b'Content-Type'] = b'text/plain'
72 72 res.setbodybytes(_('unknown permission: %s') % permission)
73 73 return
74 74
75 75 if req.method != 'POST':
76 76 res.status = b'405 Method Not Allowed'
77 77 res.headers[b'Allow'] = b'POST'
78 78 res.setbodybytes(_('commands require POST requests'))
79 79 return
80 80
81 81 # At some point we'll want to use our own API instead of recycling the
82 82 # behavior of version 1 of the wire protocol...
83 83 # TODO return reasonable responses - not responses that overload the
84 84 # HTTP status line message for error reporting.
85 85 try:
86 86 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
87 87 except hgwebcommon.ErrorResponse as e:
88 88 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
89 89 for k, v in e.headers:
90 90 res.headers[k] = v
91 91 res.setbodybytes('permission denied')
92 92 return
93 93
94 94 # We have a special endpoint to reflect the request back at the client.
95 95 if command == b'debugreflect':
96 96 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
97 97 return
98 98
99 99 # Extra commands that we handle that aren't really wire protocol
100 100 # commands. Think extra hard before making this hackery available to
101 101 # extension.
102 102 extracommands = {'multirequest'}
103 103
104 104 if command not in COMMANDS and command not in extracommands:
105 105 res.status = b'404 Not Found'
106 106 res.headers[b'Content-Type'] = b'text/plain'
107 107 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
108 108 return
109 109
110 110 repo = rctx.repo
111 111 ui = repo.ui
112 112
113 113 proto = httpv2protocolhandler(req, ui)
114 114
115 115 if (not COMMANDS.commandavailable(command, proto)
116 116 and command not in extracommands):
117 117 res.status = b'404 Not Found'
118 118 res.headers[b'Content-Type'] = b'text/plain'
119 119 res.setbodybytes(_('invalid wire protocol command: %s') % command)
120 120 return
121 121
122 122 # TODO consider cases where proxies may add additional Accept headers.
123 123 if req.headers.get(b'Accept') != FRAMINGTYPE:
124 124 res.status = b'406 Not Acceptable'
125 125 res.headers[b'Content-Type'] = b'text/plain'
126 126 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
127 127 % FRAMINGTYPE)
128 128 return
129 129
130 130 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
131 131 res.status = b'415 Unsupported Media Type'
132 132 # TODO we should send a response with appropriate media type,
133 133 # since client does Accept it.
134 134 res.headers[b'Content-Type'] = b'text/plain'
135 135 res.setbodybytes(_('client MUST send Content-Type header with '
136 136 'value: %s\n') % FRAMINGTYPE)
137 137 return
138 138
139 139 _processhttpv2request(ui, repo, req, res, permission, command, proto)
140 140
141 141 def _processhttpv2reflectrequest(ui, repo, req, res):
142 142 """Reads unified frame protocol request and dumps out state to client.
143 143
144 144 This special endpoint can be used to help debug the wire protocol.
145 145
146 146 Instead of routing the request through the normal dispatch mechanism,
147 147 we instead read all frames, decode them, and feed them into our state
148 148 tracker. We then dump the log of all that activity back out to the
149 149 client.
150 150 """
151 151 import json
152 152
153 153 # Reflection APIs have a history of being abused, accidentally disclosing
154 154 # sensitive data, etc. So we have a config knob.
155 155 if not ui.configbool('experimental', 'web.api.debugreflect'):
156 156 res.status = b'404 Not Found'
157 157 res.headers[b'Content-Type'] = b'text/plain'
158 158 res.setbodybytes(_('debugreflect service not available'))
159 159 return
160 160
161 161 # We assume we have a unified framing protocol request body.
162 162
163 163 reactor = wireprotoframing.serverreactor(ui)
164 164 states = []
165 165
166 166 while True:
167 167 frame = wireprotoframing.readframe(req.bodyfh)
168 168
169 169 if not frame:
170 170 states.append(b'received: <no frame>')
171 171 break
172 172
173 173 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
174 174 frame.requestid,
175 175 frame.payload))
176 176
177 177 action, meta = reactor.onframerecv(frame)
178 178 states.append(json.dumps((action, meta), sort_keys=True,
179 179 separators=(', ', ': ')))
180 180
181 181 action, meta = reactor.oninputeof()
182 182 meta['action'] = action
183 183 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
184 184
185 185 res.status = b'200 OK'
186 186 res.headers[b'Content-Type'] = b'text/plain'
187 187 res.setbodybytes(b'\n'.join(states))
188 188
189 189 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
190 190 """Post-validation handler for HTTPv2 requests.
191 191
192 192 Called when the HTTP request contains unified frame-based protocol
193 193 frames for evaluation.
194 194 """
195 195 # TODO Some HTTP clients are full duplex and can receive data before
196 196 # the entire request is transmitted. Figure out a way to indicate support
197 197 # for that so we can opt into full duplex mode.
198 198 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
199 199 seencommand = False
200 200
201 201 outstream = None
202 202
203 203 while True:
204 204 frame = wireprotoframing.readframe(req.bodyfh)
205 205 if not frame:
206 206 break
207 207
208 208 action, meta = reactor.onframerecv(frame)
209 209
210 210 if action == 'wantframe':
211 211 # Need more data before we can do anything.
212 212 continue
213 213 elif action == 'runcommand':
214 214 # Defer creating output stream because we need to wait for
215 215 # protocol settings frames so proper encoding can be applied.
216 216 if not outstream:
217 217 outstream = reactor.makeoutputstream()
218 218
219 219 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
220 220 reqcommand, reactor, outstream,
221 221 meta, issubsequent=seencommand)
222 222
223 223 if sentoutput:
224 224 return
225 225
226 226 seencommand = True
227 227
228 228 elif action == 'error':
229 229 # TODO define proper error mechanism.
230 230 res.status = b'200 OK'
231 231 res.headers[b'Content-Type'] = b'text/plain'
232 232 res.setbodybytes(meta['message'] + b'\n')
233 233 return
234 234 else:
235 235 raise error.ProgrammingError(
236 236 'unhandled action from frame processor: %s' % action)
237 237
238 238 action, meta = reactor.oninputeof()
239 239 if action == 'sendframes':
240 240 # We assume we haven't started sending the response yet. If we're
241 241 # wrong, the response type will raise an exception.
242 242 res.status = b'200 OK'
243 243 res.headers[b'Content-Type'] = FRAMINGTYPE
244 244 res.setbodygen(meta['framegen'])
245 245 elif action == 'noop':
246 246 pass
247 247 else:
248 248 raise error.ProgrammingError('unhandled action from frame processor: %s'
249 249 % action)
250 250
251 251 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
252 252 outstream, command, issubsequent):
253 253 """Dispatch a wire protocol command made from HTTPv2 requests.
254 254
255 255 The authenticated permission (``authedperm``) along with the original
256 256 command from the URL (``reqcommand``) are passed in.
257 257 """
258 258 # We already validated that the session has permissions to perform the
259 259 # actions in ``authedperm``. In the unified frame protocol, the canonical
260 260 # command to run is expressed in a frame. However, the URL also requested
261 261 # to run a specific command. We need to be careful that the command we
262 262 # run doesn't have permissions requirements greater than what was granted
263 263 # by ``authedperm``.
264 264 #
265 265 # Our rule for this is we only allow one command per HTTP request and
266 266 # that command must match the command in the URL. However, we make
267 267 # an exception for the ``multirequest`` URL. This URL is allowed to
268 268 # execute multiple commands. We double check permissions of each command
269 269 # as it is invoked to ensure there is no privilege escalation.
270 270 # TODO consider allowing multiple commands to regular command URLs
271 271 # iff each command is the same.
272 272
273 273 proto = httpv2protocolhandler(req, ui, args=command['args'])
274 274
275 275 if reqcommand == b'multirequest':
276 276 if not COMMANDS.commandavailable(command['command'], proto):
277 277 # TODO proper error mechanism
278 278 res.status = b'200 OK'
279 279 res.headers[b'Content-Type'] = b'text/plain'
280 280 res.setbodybytes(_('wire protocol command not available: %s') %
281 281 command['command'])
282 282 return True
283 283
284 284 # TODO don't use assert here, since it may be elided by -O.
285 285 assert authedperm in (b'ro', b'rw')
286 286 wirecommand = COMMANDS[command['command']]
287 287 assert wirecommand.permission in ('push', 'pull')
288 288
289 289 if authedperm == b'ro' and wirecommand.permission != 'pull':
290 290 # TODO proper error mechanism
291 291 res.status = b'403 Forbidden'
292 292 res.headers[b'Content-Type'] = b'text/plain'
293 293 res.setbodybytes(_('insufficient permissions to execute '
294 294 'command: %s') % command['command'])
295 295 return True
296 296
297 297 # TODO should we also call checkperm() here? Maybe not if we're going
298 298 # to overhaul that API. The granted scope from the URL check should
299 299 # be good enough.
300 300
301 301 else:
302 302 # Don't allow multiple commands outside of ``multirequest`` URL.
303 303 if issubsequent:
304 304 # TODO proper error mechanism
305 305 res.status = b'200 OK'
306 306 res.headers[b'Content-Type'] = b'text/plain'
307 307 res.setbodybytes(_('multiple commands cannot be issued to this '
308 308 'URL'))
309 309 return True
310 310
311 311 if reqcommand != command['command']:
312 312 # TODO define proper error mechanism
313 313 res.status = b'200 OK'
314 314 res.headers[b'Content-Type'] = b'text/plain'
315 315 res.setbodybytes(_('command in frame must match command in URL'))
316 316 return True
317 317
318 318 res.status = b'200 OK'
319 319 res.headers[b'Content-Type'] = FRAMINGTYPE
320 320
321 321 try:
322 322 objs = dispatch(repo, proto, command['command'], command['redirect'])
323 323
324 324 action, meta = reactor.oncommandresponsereadyobjects(
325 325 outstream, command['requestid'], objs)
326 326
327 327 except error.WireprotoCommandError as e:
328 328 action, meta = reactor.oncommanderror(
329 329 outstream, command['requestid'], e.message, e.messageargs)
330 330
331 331 except Exception as e:
332 332 action, meta = reactor.onservererror(
333 333 outstream, command['requestid'],
334 334 _('exception when invoking command: %s') %
335 335 stringutil.forcebytestr(e))
336 336
337 337 if action == 'sendframes':
338 338 res.setbodygen(meta['framegen'])
339 339 return True
340 340 elif action == 'noop':
341 341 return False
342 342 else:
343 343 raise error.ProgrammingError('unhandled event from reactor: %s' %
344 344 action)
345 345
346 346 def getdispatchrepo(repo, proto, command):
347 347 return repo.filtered('served')
348 348
349 349 def dispatch(repo, proto, command, redirect):
350 350 """Run a wire protocol command.
351 351
352 352 Returns an iterable of objects that will be sent to the client.
353 353 """
354 354 repo = getdispatchrepo(repo, proto, command)
355 355
356 356 entry = COMMANDS[command]
357 357 func = entry.func
358 358 spec = entry.args
359 359
360 360 args = proto.getargs(spec)
361 361
362 362 # There is some duplicate boilerplate code here for calling the command and
363 363 # emitting objects. It is either that or a lot of indented code that looks
364 364 # like a pyramid (since there are a lot of code paths that result in not
365 365 # using the cacher).
366 366 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
367 367
368 368 # Request is not cacheable. Don't bother instantiating a cacher.
369 369 if not entry.cachekeyfn:
370 370 for o in callcommand():
371 371 yield o
372 372 return
373 373
374 374 if redirect:
375 375 redirecttargets = redirect[b'targets']
376 376 redirecthashes = redirect[b'hashes']
377 377 else:
378 378 redirecttargets = []
379 379 redirecthashes = []
380 380
381 381 cacher = makeresponsecacher(repo, proto, command, args,
382 382 cborutil.streamencode,
383 383 redirecttargets=redirecttargets,
384 384 redirecthashes=redirecthashes)
385 385
386 386 # But we have no cacher. Do default handling.
387 387 if not cacher:
388 388 for o in callcommand():
389 389 yield o
390 390 return
391 391
392 392 with cacher:
393 cachekey = entry.cachekeyfn(repo, proto, cacher, **args)
393 cachekey = entry.cachekeyfn(repo, proto, cacher,
394 **pycompat.strkwargs(args))
394 395
395 396 # No cache key or the cacher doesn't like it. Do default handling.
396 397 if cachekey is None or not cacher.setcachekey(cachekey):
397 398 for o in callcommand():
398 399 yield o
399 400 return
400 401
401 402 # Serve it from the cache, if possible.
402 403 cached = cacher.lookup()
403 404
404 405 if cached:
405 406 for o in cached['objs']:
406 407 yield o
407 408 return
408 409
409 410 # Else call the command and feed its output into the cacher, allowing
410 411 # the cacher to buffer/mutate objects as it desires.
411 412 for o in callcommand():
412 413 for o in cacher.onobject(o):
413 414 yield o
414 415
415 416 for o in cacher.onfinished():
416 417 yield o
417 418
418 419 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
419 420 class httpv2protocolhandler(object):
420 421 def __init__(self, req, ui, args=None):
421 422 self._req = req
422 423 self._ui = ui
423 424 self._args = args
424 425
425 426 @property
426 427 def name(self):
427 428 return HTTP_WIREPROTO_V2
428 429
429 430 def getargs(self, args):
430 431 # First look for args that were passed but aren't registered on this
431 432 # command.
432 433 extra = set(self._args) - set(args)
433 434 if extra:
434 435 raise error.WireprotoCommandError(
435 436 'unsupported argument to command: %s' %
436 437 ', '.join(sorted(extra)))
437 438
438 439 # And look for required arguments that are missing.
439 440 missing = {a for a in args if args[a]['required']} - set(self._args)
440 441
441 442 if missing:
442 443 raise error.WireprotoCommandError(
443 444 'missing required arguments: %s' % ', '.join(sorted(missing)))
444 445
445 446 # Now derive the arguments to pass to the command, taking into
446 447 # account the arguments specified by the client.
447 448 data = {}
448 449 for k, meta in sorted(args.items()):
449 450 # This argument wasn't passed by the client.
450 451 if k not in self._args:
451 452 data[k] = meta['default']()
452 453 continue
453 454
454 455 v = self._args[k]
455 456
456 457 # Sets may be expressed as lists. Silently normalize.
457 458 if meta['type'] == 'set' and isinstance(v, list):
458 459 v = set(v)
459 460
460 461 # TODO consider more/stronger type validation.
461 462
462 463 data[k] = v
463 464
464 465 return data
465 466
466 467 def getprotocaps(self):
467 468 # Protocol capabilities are currently not implemented for HTTP V2.
468 469 return set()
469 470
470 471 def getpayload(self):
471 472 raise NotImplementedError
472 473
473 474 @contextlib.contextmanager
474 475 def mayberedirectstdio(self):
475 476 raise NotImplementedError
476 477
477 478 def client(self):
478 479 raise NotImplementedError
479 480
480 481 def addcapabilities(self, repo, caps):
481 482 return caps
482 483
483 484 def checkperm(self, perm):
484 485 raise NotImplementedError
485 486
486 487 def httpv2apidescriptor(req, repo):
487 488 proto = httpv2protocolhandler(req, repo.ui)
488 489
489 490 return _capabilitiesv2(repo, proto)
490 491
491 492 def _capabilitiesv2(repo, proto):
492 493 """Obtain the set of capabilities for version 2 transports.
493 494
494 495 These capabilities are distinct from the capabilities for version 1
495 496 transports.
496 497 """
497 498 caps = {
498 499 'commands': {},
499 500 'framingmediatypes': [FRAMINGTYPE],
500 501 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
501 502 }
502 503
503 504 for command, entry in COMMANDS.items():
504 505 args = {}
505 506
506 507 for arg, meta in entry.args.items():
507 508 args[arg] = {
508 509 # TODO should this be a normalized type using CBOR's
509 510 # terminology?
510 511 b'type': meta['type'],
511 512 b'required': meta['required'],
512 513 }
513 514
514 515 if not meta['required']:
515 516 args[arg][b'default'] = meta['default']()
516 517
517 518 if meta['validvalues']:
518 519 args[arg][b'validvalues'] = meta['validvalues']
519 520
520 521 # TODO this type of check should be defined in a per-command callback.
521 522 if (command == b'rawstorefiledata'
522 523 and not streamclone.allowservergeneration(repo)):
523 524 continue
524 525
525 526 caps['commands'][command] = {
526 527 'args': args,
527 528 'permissions': [entry.permission],
528 529 }
529 530
530 531 if entry.extracapabilitiesfn:
531 532 extracaps = entry.extracapabilitiesfn(repo, proto)
532 533 caps['commands'][command].update(extracaps)
533 534
534 535 caps['rawrepoformats'] = sorted(repo.requirements &
535 536 repo.supportedformats)
536 537
537 538 targets = getadvertisedredirecttargets(repo, proto)
538 539 if targets:
539 540 caps[b'redirect'] = {
540 541 b'targets': [],
541 542 b'hashes': [b'sha256', b'sha1'],
542 543 }
543 544
544 545 for target in targets:
545 546 entry = {
546 547 b'name': target['name'],
547 548 b'protocol': target['protocol'],
548 549 b'uris': target['uris'],
549 550 }
550 551
551 552 for key in ('snirequired', 'tlsversions'):
552 553 if key in target:
553 554 entry[key] = target[key]
554 555
555 556 caps[b'redirect'][b'targets'].append(entry)
556 557
557 558 return proto.addcapabilities(repo, caps)
558 559
559 560 def getadvertisedredirecttargets(repo, proto):
560 561 """Obtain a list of content redirect targets.
561 562
562 563 Returns a list containing potential redirect targets that will be
563 564 advertised in capabilities data. Each dict MUST have the following
564 565 keys:
565 566
566 567 name
567 568 The name of this redirect target. This is the identifier clients use
568 569 to refer to a target. It is transferred as part of every command
569 570 request.
570 571
571 572 protocol
572 573 Network protocol used by this target. Typically this is the string
573 574 in front of the ``://`` in a URL. e.g. ``https``.
574 575
575 576 uris
576 577 List of representative URIs for this target. Clients can use the
577 578 URIs to test parsing for compatibility or for ordering preference
578 579 for which target to use.
579 580
580 581 The following optional keys are recognized:
581 582
582 583 snirequired
583 584 Bool indicating if Server Name Indication (SNI) is required to
584 585 connect to this target.
585 586
586 587 tlsversions
587 588 List of bytes indicating which TLS versions are supported by this
588 589 target.
589 590
590 591 By default, clients reflect the target order advertised by servers
591 592 and servers will use the first client-advertised target when picking
592 593 a redirect target. So targets should be advertised in the order the
593 594 server prefers they be used.
594 595 """
595 596 return []
596 597
597 598 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None,
598 599 extracapabilitiesfn=None):
599 600 """Decorator to declare a wire protocol command.
600 601
601 602 ``name`` is the name of the wire protocol command being provided.
602 603
603 604 ``args`` is a dict defining arguments accepted by the command. Keys are
604 605 the argument name. Values are dicts with the following keys:
605 606
606 607 ``type``
607 608 The argument data type. Must be one of the following string
608 609 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
609 610 or ``bool``.
610 611
611 612 ``default``
612 613 A callable returning the default value for this argument. If not
613 614 specified, ``None`` will be the default value.
614 615
615 616 ``example``
616 617 An example value for this argument.
617 618
618 619 ``validvalues``
619 620 Set of recognized values for this argument.
620 621
621 622 ``permission`` defines the permission type needed to run this command.
622 623 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
623 624 respectively. Default is to assume command requires ``push`` permissions
624 625 because otherwise commands not declaring their permissions could modify
625 626 a repository that is supposed to be read-only.
626 627
627 628 ``cachekeyfn`` defines an optional callable that can derive the
628 629 cache key for this request.
629 630
630 631 ``extracapabilitiesfn`` defines an optional callable that defines extra
631 632 command capabilities/parameters that are advertised next to the command
632 633 in the capabilities data structure describing the server. The callable
633 634 receives as arguments the repository and protocol objects. It returns
634 635 a dict of extra fields to add to the command descriptor.
635 636
636 637 Wire protocol commands are generators of objects to be serialized and
637 638 sent to the client.
638 639
639 640 If a command raises an uncaught exception, this will be translated into
640 641 a command error.
641 642
642 643 All commands can opt in to being cacheable by defining a function
643 644 (``cachekeyfn``) that is called to derive a cache key. This function
644 645 receives the same arguments as the command itself plus a ``cacher``
645 646 argument containing the active cacher for the request and returns a bytes
646 647 containing the key in a cache the response to this command may be cached
647 648 under.
648 649 """
649 650 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
650 651 if v['version'] == 2}
651 652
652 653 if permission not in ('push', 'pull'):
653 654 raise error.ProgrammingError('invalid wire protocol permission; '
654 655 'got %s; expected "push" or "pull"' %
655 656 permission)
656 657
657 658 if args is None:
658 659 args = {}
659 660
660 661 if not isinstance(args, dict):
661 662 raise error.ProgrammingError('arguments for version 2 commands '
662 663 'must be declared as dicts')
663 664
664 665 for arg, meta in args.items():
665 666 if arg == '*':
666 667 raise error.ProgrammingError('* argument name not allowed on '
667 668 'version 2 commands')
668 669
669 670 if not isinstance(meta, dict):
670 671 raise error.ProgrammingError('arguments for version 2 commands '
671 672 'must declare metadata as a dict')
672 673
673 674 if 'type' not in meta:
674 675 raise error.ProgrammingError('%s argument for command %s does not '
675 676 'declare type field' % (arg, name))
676 677
677 678 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
678 679 raise error.ProgrammingError('%s argument for command %s has '
679 680 'illegal type: %s' % (arg, name,
680 681 meta['type']))
681 682
682 683 if 'example' not in meta:
683 684 raise error.ProgrammingError('%s argument for command %s does not '
684 685 'declare example field' % (arg, name))
685 686
686 687 meta['required'] = 'default' not in meta
687 688
688 689 meta.setdefault('default', lambda: None)
689 690 meta.setdefault('validvalues', None)
690 691
691 692 def register(func):
692 693 if name in COMMANDS:
693 694 raise error.ProgrammingError('%s command already registered '
694 695 'for version 2' % name)
695 696
696 697 COMMANDS[name] = wireprototypes.commandentry(
697 698 func, args=args, transports=transports, permission=permission,
698 699 cachekeyfn=cachekeyfn, extracapabilitiesfn=extracapabilitiesfn)
699 700
700 701 return func
701 702
702 703 return register
703 704
704 705 def makecommandcachekeyfn(command, localversion=None, allargs=False):
705 706 """Construct a cache key derivation function with common features.
706 707
707 708 By default, the cache key is a hash of:
708 709
709 710 * The command name.
710 711 * A global cache version number.
711 712 * A local cache version number (passed via ``localversion``).
712 713 * All the arguments passed to the command.
713 714 * The media type used.
714 715 * Wire protocol version string.
715 716 * The repository path.
716 717 """
717 718 if not allargs:
718 719 raise error.ProgrammingError('only allargs=True is currently supported')
719 720
720 721 if localversion is None:
721 722 raise error.ProgrammingError('must set localversion argument value')
722 723
723 724 def cachekeyfn(repo, proto, cacher, **args):
724 725 spec = COMMANDS[command]
725 726
726 727 # Commands that mutate the repo can not be cached.
727 728 if spec.permission == 'push':
728 729 return None
729 730
730 731 # TODO config option to disable caching.
731 732
732 733 # Our key derivation strategy is to construct a data structure
733 734 # holding everything that could influence cacheability and to hash
734 735 # the CBOR representation of that. Using CBOR seems like it might
735 736 # be overkill. However, simpler hashing mechanisms are prone to
736 737 # duplicate input issues. e.g. if you just concatenate two values,
737 738 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
738 739 # "padding" between values and prevents these problems.
739 740
740 741 # Seed the hash with various data.
741 742 state = {
742 743 # To invalidate all cache keys.
743 744 b'globalversion': GLOBAL_CACHE_VERSION,
744 745 # More granular cache key invalidation.
745 746 b'localversion': localversion,
746 747 # Cache keys are segmented by command.
747 748 b'command': pycompat.sysbytes(command),
748 749 # Throw in the media type and API version strings so changes
749 750 # to exchange semantics invalid cache.
750 751 b'mediatype': FRAMINGTYPE,
751 752 b'version': HTTP_WIREPROTO_V2,
752 753 # So same requests for different repos don't share cache keys.
753 754 b'repo': repo.root,
754 755 }
755 756
756 757 # The arguments passed to us will have already been normalized.
757 758 # Default values will be set, etc. This is important because it
758 759 # means that it doesn't matter if clients send an explicit argument
759 760 # or rely on the default value: it will all normalize to the same
760 761 # set of arguments on the server and therefore the same cache key.
761 762 #
762 763 # Arguments by their very nature must support being encoded to CBOR.
763 764 # And the CBOR encoder is deterministic. So we hash the arguments
764 765 # by feeding the CBOR of their representation into the hasher.
765 766 if allargs:
766 767 state[b'args'] = pycompat.byteskwargs(args)
767 768
768 769 cacher.adjustcachekeystate(state)
769 770
770 771 hasher = hashlib.sha1()
771 772 for chunk in cborutil.streamencode(state):
772 773 hasher.update(chunk)
773 774
774 775 return pycompat.sysbytes(hasher.hexdigest())
775 776
776 777 return cachekeyfn
777 778
778 779 def makeresponsecacher(repo, proto, command, args, objencoderfn,
779 780 redirecttargets, redirecthashes):
780 781 """Construct a cacher for a cacheable command.
781 782
782 783 Returns an ``iwireprotocolcommandcacher`` instance.
783 784
784 785 Extensions can monkeypatch this function to provide custom caching
785 786 backends.
786 787 """
787 788 return None
788 789
789 790 def resolvenodes(repo, revisions):
790 791 """Resolve nodes from a revisions specifier data structure."""
791 792 cl = repo.changelog
792 793 clhasnode = cl.hasnode
793 794
794 795 seen = set()
795 796 nodes = []
796 797
797 798 if not isinstance(revisions, list):
798 799 raise error.WireprotoCommandError('revisions must be defined as an '
799 800 'array')
800 801
801 802 for spec in revisions:
802 803 if b'type' not in spec:
803 804 raise error.WireprotoCommandError(
804 805 'type key not present in revision specifier')
805 806
806 807 typ = spec[b'type']
807 808
808 809 if typ == b'changesetexplicit':
809 810 if b'nodes' not in spec:
810 811 raise error.WireprotoCommandError(
811 812 'nodes key not present in changesetexplicit revision '
812 813 'specifier')
813 814
814 815 for node in spec[b'nodes']:
815 816 if node not in seen:
816 817 nodes.append(node)
817 818 seen.add(node)
818 819
819 820 elif typ == b'changesetexplicitdepth':
820 821 for key in (b'nodes', b'depth'):
821 822 if key not in spec:
822 823 raise error.WireprotoCommandError(
823 824 '%s key not present in changesetexplicitdepth revision '
824 825 'specifier', (key,))
825 826
826 827 for rev in repo.revs(b'ancestors(%ln, %s)', spec[b'nodes'],
827 828 spec[b'depth'] - 1):
828 829 node = cl.node(rev)
829 830
830 831 if node not in seen:
831 832 nodes.append(node)
832 833 seen.add(node)
833 834
834 835 elif typ == b'changesetdagrange':
835 836 for key in (b'roots', b'heads'):
836 837 if key not in spec:
837 838 raise error.WireprotoCommandError(
838 839 '%s key not present in changesetdagrange revision '
839 840 'specifier', (key,))
840 841
841 842 if not spec[b'heads']:
842 843 raise error.WireprotoCommandError(
843 844 'heads key in changesetdagrange cannot be empty')
844 845
845 846 if spec[b'roots']:
846 847 common = [n for n in spec[b'roots'] if clhasnode(n)]
847 848 else:
848 849 common = [nullid]
849 850
850 851 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
851 852 if n not in seen:
852 853 nodes.append(n)
853 854 seen.add(n)
854 855
855 856 else:
856 857 raise error.WireprotoCommandError(
857 858 'unknown revision specifier type: %s', (typ,))
858 859
859 860 return nodes
860 861
861 862 @wireprotocommand('branchmap', permission='pull')
862 863 def branchmapv2(repo, proto):
863 864 yield {encoding.fromlocal(k): v
864 865 for k, v in repo.branchmap().iteritems()}
865 866
866 867 @wireprotocommand('capabilities', permission='pull')
867 868 def capabilitiesv2(repo, proto):
868 869 yield _capabilitiesv2(repo, proto)
869 870
870 871 @wireprotocommand(
871 872 'changesetdata',
872 873 args={
873 874 'revisions': {
874 875 'type': 'list',
875 876 'example': [{
876 877 b'type': b'changesetexplicit',
877 878 b'nodes': [b'abcdef...'],
878 879 }],
879 880 },
880 881 'fields': {
881 882 'type': 'set',
882 883 'default': set,
883 884 'example': {b'parents', b'revision'},
884 885 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
885 886 },
886 887 },
887 888 permission='pull')
888 889 def changesetdata(repo, proto, revisions, fields):
889 890 # TODO look for unknown fields and abort when they can't be serviced.
890 891 # This could probably be validated by dispatcher using validvalues.
891 892
892 893 cl = repo.changelog
893 894 outgoing = resolvenodes(repo, revisions)
894 895 publishing = repo.publishing()
895 896
896 897 if outgoing:
897 898 repo.hook('preoutgoing', throw=True, source='serve')
898 899
899 900 yield {
900 901 b'totalitems': len(outgoing),
901 902 }
902 903
903 904 # The phases of nodes already transferred to the client may have changed
904 905 # since the client last requested data. We send phase-only records
905 906 # for these revisions, if requested.
906 907 # TODO actually do this. We'll probably want to emit phase heads
907 908 # in the ancestry set of the outgoing revisions. This will ensure
908 909 # that phase updates within that set are seen.
909 910 if b'phase' in fields:
910 911 pass
911 912
912 913 nodebookmarks = {}
913 914 for mark, node in repo._bookmarks.items():
914 915 nodebookmarks.setdefault(node, set()).add(mark)
915 916
916 917 # It is already topologically sorted by revision number.
917 918 for node in outgoing:
918 919 d = {
919 920 b'node': node,
920 921 }
921 922
922 923 if b'parents' in fields:
923 924 d[b'parents'] = cl.parents(node)
924 925
925 926 if b'phase' in fields:
926 927 if publishing:
927 928 d[b'phase'] = b'public'
928 929 else:
929 930 ctx = repo[node]
930 931 d[b'phase'] = ctx.phasestr()
931 932
932 933 if b'bookmarks' in fields and node in nodebookmarks:
933 934 d[b'bookmarks'] = sorted(nodebookmarks[node])
934 935 del nodebookmarks[node]
935 936
936 937 followingmeta = []
937 938 followingdata = []
938 939
939 940 if b'revision' in fields:
940 941 revisiondata = cl.revision(node, raw=True)
941 942 followingmeta.append((b'revision', len(revisiondata)))
942 943 followingdata.append(revisiondata)
943 944
944 945 # TODO make it possible for extensions to wrap a function or register
945 946 # a handler to service custom fields.
946 947
947 948 if followingmeta:
948 949 d[b'fieldsfollowing'] = followingmeta
949 950
950 951 yield d
951 952
952 953 for extra in followingdata:
953 954 yield extra
954 955
955 956 # If requested, send bookmarks from nodes that didn't have revision
956 957 # data sent so receiver is aware of any bookmark updates.
957 958 if b'bookmarks' in fields:
958 959 for node, marks in sorted(nodebookmarks.iteritems()):
959 960 yield {
960 961 b'node': node,
961 962 b'bookmarks': sorted(marks),
962 963 }
963 964
964 965 class FileAccessError(Exception):
965 966 """Represents an error accessing a specific file."""
966 967
967 968 def __init__(self, path, msg, args):
968 969 self.path = path
969 970 self.msg = msg
970 971 self.args = args
971 972
972 973 def getfilestore(repo, proto, path):
973 974 """Obtain a file storage object for use with wire protocol.
974 975
975 976 Exists as a standalone function so extensions can monkeypatch to add
976 977 access control.
977 978 """
978 979 # This seems to work even if the file doesn't exist. So catch
979 980 # "empty" files and return an error.
980 981 fl = repo.file(path)
981 982
982 983 if not len(fl):
983 984 raise FileAccessError(path, 'unknown file: %s', (path,))
984 985
985 986 return fl
986 987
987 988 def emitfilerevisions(repo, path, revisions, linknodes, fields):
988 989 for revision in revisions:
989 990 d = {
990 991 b'node': revision.node,
991 992 }
992 993
993 994 if b'parents' in fields:
994 995 d[b'parents'] = [revision.p1node, revision.p2node]
995 996
996 997 if b'linknode' in fields:
997 998 d[b'linknode'] = linknodes[revision.node]
998 999
999 1000 followingmeta = []
1000 1001 followingdata = []
1001 1002
1002 1003 if b'revision' in fields:
1003 1004 if revision.revision is not None:
1004 1005 followingmeta.append((b'revision', len(revision.revision)))
1005 1006 followingdata.append(revision.revision)
1006 1007 else:
1007 1008 d[b'deltabasenode'] = revision.basenode
1008 1009 followingmeta.append((b'delta', len(revision.delta)))
1009 1010 followingdata.append(revision.delta)
1010 1011
1011 1012 if followingmeta:
1012 1013 d[b'fieldsfollowing'] = followingmeta
1013 1014
1014 1015 yield d
1015 1016
1016 1017 for extra in followingdata:
1017 1018 yield extra
1018 1019
1019 1020 def makefilematcher(repo, pathfilter):
1020 1021 """Construct a matcher from a path filter dict."""
1021 1022
1022 1023 # Validate values.
1023 1024 if pathfilter:
1024 1025 for key in (b'include', b'exclude'):
1025 1026 for pattern in pathfilter.get(key, []):
1026 1027 if not pattern.startswith((b'path:', b'rootfilesin:')):
1027 1028 raise error.WireprotoCommandError(
1028 1029 '%s pattern must begin with `path:` or `rootfilesin:`; '
1029 1030 'got %s', (key, pattern))
1030 1031
1031 1032 if pathfilter:
1032 1033 matcher = matchmod.match(repo.root, b'',
1033 1034 include=pathfilter.get(b'include', []),
1034 1035 exclude=pathfilter.get(b'exclude', []))
1035 1036 else:
1036 1037 matcher = matchmod.match(repo.root, b'')
1037 1038
1038 1039 # Requested patterns could include files not in the local store. So
1039 1040 # filter those out.
1040 1041 return repo.narrowmatch(matcher)
1041 1042
1042 1043 @wireprotocommand(
1043 1044 'filedata',
1044 1045 args={
1045 1046 'haveparents': {
1046 1047 'type': 'bool',
1047 1048 'default': lambda: False,
1048 1049 'example': True,
1049 1050 },
1050 1051 'nodes': {
1051 1052 'type': 'list',
1052 1053 'example': [b'0123456...'],
1053 1054 },
1054 1055 'fields': {
1055 1056 'type': 'set',
1056 1057 'default': set,
1057 1058 'example': {b'parents', b'revision'},
1058 1059 'validvalues': {b'parents', b'revision', b'linknode'},
1059 1060 },
1060 1061 'path': {
1061 1062 'type': 'bytes',
1062 1063 'example': b'foo.txt',
1063 1064 }
1064 1065 },
1065 1066 permission='pull',
1066 1067 # TODO censoring a file revision won't invalidate the cache.
1067 1068 # Figure out a way to take censoring into account when deriving
1068 1069 # the cache key.
1069 1070 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
1070 1071 def filedata(repo, proto, haveparents, nodes, fields, path):
1071 1072 # TODO this API allows access to file revisions that are attached to
1072 1073 # secret changesets. filesdata does not have this problem. Maybe this
1073 1074 # API should be deleted?
1074 1075
1075 1076 try:
1076 1077 # Extensions may wish to access the protocol handler.
1077 1078 store = getfilestore(repo, proto, path)
1078 1079 except FileAccessError as e:
1079 1080 raise error.WireprotoCommandError(e.msg, e.args)
1080 1081
1081 1082 clnode = repo.changelog.node
1082 1083 linknodes = {}
1083 1084
1084 1085 # Validate requested nodes.
1085 1086 for node in nodes:
1086 1087 try:
1087 1088 store.rev(node)
1088 1089 except error.LookupError:
1089 1090 raise error.WireprotoCommandError('unknown file node: %s',
1090 1091 (hex(node),))
1091 1092
1092 1093 # TODO by creating the filectx against a specific file revision
1093 1094 # instead of changeset, linkrev() is always used. This is wrong for
1094 1095 # cases where linkrev() may refer to a hidden changeset. But since this
1095 1096 # API doesn't know anything about changesets, we're not sure how to
1096 1097 # disambiguate the linknode. Perhaps we should delete this API?
1097 1098 fctx = repo.filectx(path, fileid=node)
1098 1099 linknodes[node] = clnode(fctx.introrev())
1099 1100
1100 1101 revisions = store.emitrevisions(nodes,
1101 1102 revisiondata=b'revision' in fields,
1102 1103 assumehaveparentrevisions=haveparents)
1103 1104
1104 1105 yield {
1105 1106 b'totalitems': len(nodes),
1106 1107 }
1107 1108
1108 1109 for o in emitfilerevisions(repo, path, revisions, linknodes, fields):
1109 1110 yield o
1110 1111
1111 1112 def filesdatacapabilities(repo, proto):
1112 1113 batchsize = repo.ui.configint(
1113 1114 b'experimental', b'server.filesdata.recommended-batch-size')
1114 1115 return {
1115 1116 b'recommendedbatchsize': batchsize,
1116 1117 }
1117 1118
1118 1119 @wireprotocommand(
1119 1120 'filesdata',
1120 1121 args={
1121 1122 'haveparents': {
1122 1123 'type': 'bool',
1123 1124 'default': lambda: False,
1124 1125 'example': True,
1125 1126 },
1126 1127 'fields': {
1127 1128 'type': 'set',
1128 1129 'default': set,
1129 1130 'example': {b'parents', b'revision'},
1130 1131 'validvalues': {b'firstchangeset', b'linknode', b'parents',
1131 1132 b'revision'},
1132 1133 },
1133 1134 'pathfilter': {
1134 1135 'type': 'dict',
1135 1136 'default': lambda: None,
1136 1137 'example': {b'include': [b'path:tests']},
1137 1138 },
1138 1139 'revisions': {
1139 1140 'type': 'list',
1140 1141 'example': [{
1141 1142 b'type': b'changesetexplicit',
1142 1143 b'nodes': [b'abcdef...'],
1143 1144 }],
1144 1145 },
1145 1146 },
1146 1147 permission='pull',
1147 1148 # TODO censoring a file revision won't invalidate the cache.
1148 1149 # Figure out a way to take censoring into account when deriving
1149 1150 # the cache key.
1150 1151 cachekeyfn=makecommandcachekeyfn('filesdata', 1, allargs=True),
1151 1152 extracapabilitiesfn=filesdatacapabilities)
1152 1153 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1153 1154 # TODO This should operate on a repo that exposes obsolete changesets. There
1154 1155 # is a race between a client making a push that obsoletes a changeset and
1155 1156 # another client fetching files data for that changeset. If a client has a
1156 1157 # changeset, it should probably be allowed to access files data for that
1157 1158 # changeset.
1158 1159
1159 1160 outgoing = resolvenodes(repo, revisions)
1160 1161 filematcher = makefilematcher(repo, pathfilter)
1161 1162
1162 1163 # path -> {fnode: linknode}
1163 1164 fnodes = collections.defaultdict(dict)
1164 1165
1165 1166 # We collect the set of relevant file revisions by iterating the changeset
1166 1167 # revisions and either walking the set of files recorded in the changeset
1167 1168 # or by walking the manifest at that revision. There is probably room for a
1168 1169 # storage-level API to request this data, as it can be expensive to compute
1169 1170 # and would benefit from caching or alternate storage from what revlogs
1170 1171 # provide.
1171 1172 for node in outgoing:
1172 1173 ctx = repo[node]
1173 1174 mctx = ctx.manifestctx()
1174 1175 md = mctx.read()
1175 1176
1176 1177 if haveparents:
1177 1178 checkpaths = ctx.files()
1178 1179 else:
1179 1180 checkpaths = md.keys()
1180 1181
1181 1182 for path in checkpaths:
1182 1183 fnode = md[path]
1183 1184
1184 1185 if path in fnodes and fnode in fnodes[path]:
1185 1186 continue
1186 1187
1187 1188 if not filematcher(path):
1188 1189 continue
1189 1190
1190 1191 fnodes[path].setdefault(fnode, node)
1191 1192
1192 1193 yield {
1193 1194 b'totalpaths': len(fnodes),
1194 1195 b'totalitems': sum(len(v) for v in fnodes.values())
1195 1196 }
1196 1197
1197 1198 for path, filenodes in sorted(fnodes.items()):
1198 1199 try:
1199 1200 store = getfilestore(repo, proto, path)
1200 1201 except FileAccessError as e:
1201 1202 raise error.WireprotoCommandError(e.msg, e.args)
1202 1203
1203 1204 yield {
1204 1205 b'path': path,
1205 1206 b'totalitems': len(filenodes),
1206 1207 }
1207 1208
1208 1209 revisions = store.emitrevisions(filenodes.keys(),
1209 1210 revisiondata=b'revision' in fields,
1210 1211 assumehaveparentrevisions=haveparents)
1211 1212
1212 1213 for o in emitfilerevisions(repo, path, revisions, filenodes, fields):
1213 1214 yield o
1214 1215
1215 1216 @wireprotocommand(
1216 1217 'heads',
1217 1218 args={
1218 1219 'publiconly': {
1219 1220 'type': 'bool',
1220 1221 'default': lambda: False,
1221 1222 'example': False,
1222 1223 },
1223 1224 },
1224 1225 permission='pull')
1225 1226 def headsv2(repo, proto, publiconly):
1226 1227 if publiconly:
1227 1228 repo = repo.filtered('immutable')
1228 1229
1229 1230 yield repo.heads()
1230 1231
1231 1232 @wireprotocommand(
1232 1233 'known',
1233 1234 args={
1234 1235 'nodes': {
1235 1236 'type': 'list',
1236 1237 'default': list,
1237 1238 'example': [b'deadbeef'],
1238 1239 },
1239 1240 },
1240 1241 permission='pull')
1241 1242 def knownv2(repo, proto, nodes):
1242 1243 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1243 1244 yield result
1244 1245
1245 1246 @wireprotocommand(
1246 1247 'listkeys',
1247 1248 args={
1248 1249 'namespace': {
1249 1250 'type': 'bytes',
1250 1251 'example': b'ns',
1251 1252 },
1252 1253 },
1253 1254 permission='pull')
1254 1255 def listkeysv2(repo, proto, namespace):
1255 1256 keys = repo.listkeys(encoding.tolocal(namespace))
1256 1257 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1257 1258 for k, v in keys.iteritems()}
1258 1259
1259 1260 yield keys
1260 1261
1261 1262 @wireprotocommand(
1262 1263 'lookup',
1263 1264 args={
1264 1265 'key': {
1265 1266 'type': 'bytes',
1266 1267 'example': b'foo',
1267 1268 },
1268 1269 },
1269 1270 permission='pull')
1270 1271 def lookupv2(repo, proto, key):
1271 1272 key = encoding.tolocal(key)
1272 1273
1273 1274 # TODO handle exception.
1274 1275 node = repo.lookup(key)
1275 1276
1276 1277 yield node
1277 1278
1278 1279 def manifestdatacapabilities(repo, proto):
1279 1280 batchsize = repo.ui.configint(
1280 1281 b'experimental', b'server.manifestdata.recommended-batch-size')
1281 1282
1282 1283 return {
1283 1284 b'recommendedbatchsize': batchsize,
1284 1285 }
1285 1286
1286 1287 @wireprotocommand(
1287 1288 'manifestdata',
1288 1289 args={
1289 1290 'nodes': {
1290 1291 'type': 'list',
1291 1292 'example': [b'0123456...'],
1292 1293 },
1293 1294 'haveparents': {
1294 1295 'type': 'bool',
1295 1296 'default': lambda: False,
1296 1297 'example': True,
1297 1298 },
1298 1299 'fields': {
1299 1300 'type': 'set',
1300 1301 'default': set,
1301 1302 'example': {b'parents', b'revision'},
1302 1303 'validvalues': {b'parents', b'revision'},
1303 1304 },
1304 1305 'tree': {
1305 1306 'type': 'bytes',
1306 1307 'example': b'',
1307 1308 },
1308 1309 },
1309 1310 permission='pull',
1310 1311 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True),
1311 1312 extracapabilitiesfn=manifestdatacapabilities)
1312 1313 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1313 1314 store = repo.manifestlog.getstorage(tree)
1314 1315
1315 1316 # Validate the node is known and abort on unknown revisions.
1316 1317 for node in nodes:
1317 1318 try:
1318 1319 store.rev(node)
1319 1320 except error.LookupError:
1320 1321 raise error.WireprotoCommandError(
1321 1322 'unknown node: %s', (node,))
1322 1323
1323 1324 revisions = store.emitrevisions(nodes,
1324 1325 revisiondata=b'revision' in fields,
1325 1326 assumehaveparentrevisions=haveparents)
1326 1327
1327 1328 yield {
1328 1329 b'totalitems': len(nodes),
1329 1330 }
1330 1331
1331 1332 for revision in revisions:
1332 1333 d = {
1333 1334 b'node': revision.node,
1334 1335 }
1335 1336
1336 1337 if b'parents' in fields:
1337 1338 d[b'parents'] = [revision.p1node, revision.p2node]
1338 1339
1339 1340 followingmeta = []
1340 1341 followingdata = []
1341 1342
1342 1343 if b'revision' in fields:
1343 1344 if revision.revision is not None:
1344 1345 followingmeta.append((b'revision', len(revision.revision)))
1345 1346 followingdata.append(revision.revision)
1346 1347 else:
1347 1348 d[b'deltabasenode'] = revision.basenode
1348 1349 followingmeta.append((b'delta', len(revision.delta)))
1349 1350 followingdata.append(revision.delta)
1350 1351
1351 1352 if followingmeta:
1352 1353 d[b'fieldsfollowing'] = followingmeta
1353 1354
1354 1355 yield d
1355 1356
1356 1357 for extra in followingdata:
1357 1358 yield extra
1358 1359
1359 1360 @wireprotocommand(
1360 1361 'pushkey',
1361 1362 args={
1362 1363 'namespace': {
1363 1364 'type': 'bytes',
1364 1365 'example': b'ns',
1365 1366 },
1366 1367 'key': {
1367 1368 'type': 'bytes',
1368 1369 'example': b'key',
1369 1370 },
1370 1371 'old': {
1371 1372 'type': 'bytes',
1372 1373 'example': b'old',
1373 1374 },
1374 1375 'new': {
1375 1376 'type': 'bytes',
1376 1377 'example': 'new',
1377 1378 },
1378 1379 },
1379 1380 permission='push')
1380 1381 def pushkeyv2(repo, proto, namespace, key, old, new):
1381 1382 # TODO handle ui output redirection
1382 1383 yield repo.pushkey(encoding.tolocal(namespace),
1383 1384 encoding.tolocal(key),
1384 1385 encoding.tolocal(old),
1385 1386 encoding.tolocal(new))
1386 1387
1387 1388
1388 1389 @wireprotocommand(
1389 1390 'rawstorefiledata',
1390 1391 args={
1391 1392 'files': {
1392 1393 'type': 'list',
1393 1394 'example': [b'changelog', b'manifestlog'],
1394 1395 },
1395 1396 'pathfilter': {
1396 1397 'type': 'list',
1397 1398 'default': lambda: None,
1398 1399 'example': {b'include': [b'path:tests']},
1399 1400 },
1400 1401 },
1401 1402 permission='pull')
1402 1403 def rawstorefiledata(repo, proto, files, pathfilter):
1403 1404 if not streamclone.allowservergeneration(repo):
1404 1405 raise error.WireprotoCommandError(b'stream clone is disabled')
1405 1406
1406 1407 # TODO support dynamically advertising what store files "sets" are
1407 1408 # available. For now, we support changelog, manifestlog, and files.
1408 1409 files = set(files)
1409 1410 allowedfiles = {b'changelog', b'manifestlog'}
1410 1411
1411 1412 unsupported = files - allowedfiles
1412 1413 if unsupported:
1413 1414 raise error.WireprotoCommandError(b'unknown file type: %s',
1414 1415 (b', '.join(sorted(unsupported)),))
1415 1416
1416 1417 with repo.lock():
1417 1418 topfiles = list(repo.store.topfiles())
1418 1419
1419 1420 sendfiles = []
1420 1421 totalsize = 0
1421 1422
1422 1423 # TODO this is a bunch of storage layer interface abstractions because
1423 1424 # it assumes revlogs.
1424 1425 for name, encodedname, size in topfiles:
1425 1426 if b'changelog' in files and name.startswith(b'00changelog'):
1426 1427 pass
1427 1428 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1428 1429 pass
1429 1430 else:
1430 1431 continue
1431 1432
1432 1433 sendfiles.append((b'store', name, size))
1433 1434 totalsize += size
1434 1435
1435 1436 yield {
1436 1437 b'filecount': len(sendfiles),
1437 1438 b'totalsize': totalsize,
1438 1439 }
1439 1440
1440 1441 for location, name, size in sendfiles:
1441 1442 yield {
1442 1443 b'location': location,
1443 1444 b'path': name,
1444 1445 b'size': size,
1445 1446 }
1446 1447
1447 1448 # We have to use a closure for this to ensure the context manager is
1448 1449 # closed only after sending the final chunk.
1449 1450 def getfiledata():
1450 1451 with repo.svfs(name, 'rb', auditpath=False) as fh:
1451 1452 for chunk in util.filechunkiter(fh, limit=size):
1452 1453 yield chunk
1453 1454
1454 1455 yield wireprototypes.indefinitebytestringresponse(
1455 1456 getfiledata())
General Comments 0
You need to be logged in to leave comments. Login now