##// END OF EJS Templates
rawdata: update caller in wireprotov2server...
marmoute -
r43042:da643cad default
parent child Browse files
Show More
@@ -1,1455 +1,1455 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import collections
10 10 import contextlib
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18 from . import (
19 19 discovery,
20 20 encoding,
21 21 error,
22 22 match as matchmod,
23 23 narrowspec,
24 24 pycompat,
25 25 streamclone,
26 26 templatefilters,
27 27 util,
28 28 wireprotoframing,
29 29 wireprototypes,
30 30 )
31 31 from .utils import (
32 32 cborutil,
33 33 interfaceutil,
34 34 stringutil,
35 35 )
36 36
37 37 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
38 38
39 39 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
40 40
41 41 COMMANDS = wireprototypes.commanddict()
42 42
43 43 # Value inserted into cache key computation function. Change the value to
44 44 # force new cache keys for every command request. This should be done when
45 45 # there is a change to how caching works, etc.
46 46 GLOBAL_CACHE_VERSION = 1
47 47
48 48 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
49 49 from .hgweb import common as hgwebcommon
50 50
51 51 # URL space looks like: <permissions>/<command>, where <permission> can
52 52 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
53 53
54 54 # Root URL does nothing meaningful... yet.
55 55 if not urlparts:
56 56 res.status = b'200 OK'
57 57 res.headers[b'Content-Type'] = b'text/plain'
58 58 res.setbodybytes(_('HTTP version 2 API handler'))
59 59 return
60 60
61 61 if len(urlparts) == 1:
62 62 res.status = b'404 Not Found'
63 63 res.headers[b'Content-Type'] = b'text/plain'
64 64 res.setbodybytes(_('do not know how to process %s\n') %
65 65 req.dispatchpath)
66 66 return
67 67
68 68 permission, command = urlparts[0:2]
69 69
70 70 if permission not in (b'ro', b'rw'):
71 71 res.status = b'404 Not Found'
72 72 res.headers[b'Content-Type'] = b'text/plain'
73 73 res.setbodybytes(_('unknown permission: %s') % permission)
74 74 return
75 75
76 76 if req.method != 'POST':
77 77 res.status = b'405 Method Not Allowed'
78 78 res.headers[b'Allow'] = b'POST'
79 79 res.setbodybytes(_('commands require POST requests'))
80 80 return
81 81
82 82 # At some point we'll want to use our own API instead of recycling the
83 83 # behavior of version 1 of the wire protocol...
84 84 # TODO return reasonable responses - not responses that overload the
85 85 # HTTP status line message for error reporting.
86 86 try:
87 87 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
88 88 except hgwebcommon.ErrorResponse as e:
89 89 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
90 90 for k, v in e.headers:
91 91 res.headers[k] = v
92 92 res.setbodybytes('permission denied')
93 93 return
94 94
95 95 # We have a special endpoint to reflect the request back at the client.
96 96 if command == b'debugreflect':
97 97 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
98 98 return
99 99
100 100 # Extra commands that we handle that aren't really wire protocol
101 101 # commands. Think extra hard before making this hackery available to
102 102 # extension.
103 103 extracommands = {'multirequest'}
104 104
105 105 if command not in COMMANDS and command not in extracommands:
106 106 res.status = b'404 Not Found'
107 107 res.headers[b'Content-Type'] = b'text/plain'
108 108 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
109 109 return
110 110
111 111 repo = rctx.repo
112 112 ui = repo.ui
113 113
114 114 proto = httpv2protocolhandler(req, ui)
115 115
116 116 if (not COMMANDS.commandavailable(command, proto)
117 117 and command not in extracommands):
118 118 res.status = b'404 Not Found'
119 119 res.headers[b'Content-Type'] = b'text/plain'
120 120 res.setbodybytes(_('invalid wire protocol command: %s') % command)
121 121 return
122 122
123 123 # TODO consider cases where proxies may add additional Accept headers.
124 124 if req.headers.get(b'Accept') != FRAMINGTYPE:
125 125 res.status = b'406 Not Acceptable'
126 126 res.headers[b'Content-Type'] = b'text/plain'
127 127 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
128 128 % FRAMINGTYPE)
129 129 return
130 130
131 131 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
132 132 res.status = b'415 Unsupported Media Type'
133 133 # TODO we should send a response with appropriate media type,
134 134 # since client does Accept it.
135 135 res.headers[b'Content-Type'] = b'text/plain'
136 136 res.setbodybytes(_('client MUST send Content-Type header with '
137 137 'value: %s\n') % FRAMINGTYPE)
138 138 return
139 139
140 140 _processhttpv2request(ui, repo, req, res, permission, command, proto)
141 141
142 142 def _processhttpv2reflectrequest(ui, repo, req, res):
143 143 """Reads unified frame protocol request and dumps out state to client.
144 144
145 145 This special endpoint can be used to help debug the wire protocol.
146 146
147 147 Instead of routing the request through the normal dispatch mechanism,
148 148 we instead read all frames, decode them, and feed them into our state
149 149 tracker. We then dump the log of all that activity back out to the
150 150 client.
151 151 """
152 152 # Reflection APIs have a history of being abused, accidentally disclosing
153 153 # sensitive data, etc. So we have a config knob.
154 154 if not ui.configbool('experimental', 'web.api.debugreflect'):
155 155 res.status = b'404 Not Found'
156 156 res.headers[b'Content-Type'] = b'text/plain'
157 157 res.setbodybytes(_('debugreflect service not available'))
158 158 return
159 159
160 160 # We assume we have a unified framing protocol request body.
161 161
162 162 reactor = wireprotoframing.serverreactor(ui)
163 163 states = []
164 164
165 165 while True:
166 166 frame = wireprotoframing.readframe(req.bodyfh)
167 167
168 168 if not frame:
169 169 states.append(b'received: <no frame>')
170 170 break
171 171
172 172 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
173 173 frame.requestid,
174 174 frame.payload))
175 175
176 176 action, meta = reactor.onframerecv(frame)
177 177 states.append(templatefilters.json((action, meta)))
178 178
179 179 action, meta = reactor.oninputeof()
180 180 meta['action'] = action
181 181 states.append(templatefilters.json(meta))
182 182
183 183 res.status = b'200 OK'
184 184 res.headers[b'Content-Type'] = b'text/plain'
185 185 res.setbodybytes(b'\n'.join(states))
186 186
187 187 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
188 188 """Post-validation handler for HTTPv2 requests.
189 189
190 190 Called when the HTTP request contains unified frame-based protocol
191 191 frames for evaluation.
192 192 """
193 193 # TODO Some HTTP clients are full duplex and can receive data before
194 194 # the entire request is transmitted. Figure out a way to indicate support
195 195 # for that so we can opt into full duplex mode.
196 196 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
197 197 seencommand = False
198 198
199 199 outstream = None
200 200
201 201 while True:
202 202 frame = wireprotoframing.readframe(req.bodyfh)
203 203 if not frame:
204 204 break
205 205
206 206 action, meta = reactor.onframerecv(frame)
207 207
208 208 if action == 'wantframe':
209 209 # Need more data before we can do anything.
210 210 continue
211 211 elif action == 'runcommand':
212 212 # Defer creating output stream because we need to wait for
213 213 # protocol settings frames so proper encoding can be applied.
214 214 if not outstream:
215 215 outstream = reactor.makeoutputstream()
216 216
217 217 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
218 218 reqcommand, reactor, outstream,
219 219 meta, issubsequent=seencommand)
220 220
221 221 if sentoutput:
222 222 return
223 223
224 224 seencommand = True
225 225
226 226 elif action == 'error':
227 227 # TODO define proper error mechanism.
228 228 res.status = b'200 OK'
229 229 res.headers[b'Content-Type'] = b'text/plain'
230 230 res.setbodybytes(meta['message'] + b'\n')
231 231 return
232 232 else:
233 233 raise error.ProgrammingError(
234 234 'unhandled action from frame processor: %s' % action)
235 235
236 236 action, meta = reactor.oninputeof()
237 237 if action == 'sendframes':
238 238 # We assume we haven't started sending the response yet. If we're
239 239 # wrong, the response type will raise an exception.
240 240 res.status = b'200 OK'
241 241 res.headers[b'Content-Type'] = FRAMINGTYPE
242 242 res.setbodygen(meta['framegen'])
243 243 elif action == 'noop':
244 244 pass
245 245 else:
246 246 raise error.ProgrammingError('unhandled action from frame processor: %s'
247 247 % action)
248 248
249 249 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
250 250 outstream, command, issubsequent):
251 251 """Dispatch a wire protocol command made from HTTPv2 requests.
252 252
253 253 The authenticated permission (``authedperm``) along with the original
254 254 command from the URL (``reqcommand``) are passed in.
255 255 """
256 256 # We already validated that the session has permissions to perform the
257 257 # actions in ``authedperm``. In the unified frame protocol, the canonical
258 258 # command to run is expressed in a frame. However, the URL also requested
259 259 # to run a specific command. We need to be careful that the command we
260 260 # run doesn't have permissions requirements greater than what was granted
261 261 # by ``authedperm``.
262 262 #
263 263 # Our rule for this is we only allow one command per HTTP request and
264 264 # that command must match the command in the URL. However, we make
265 265 # an exception for the ``multirequest`` URL. This URL is allowed to
266 266 # execute multiple commands. We double check permissions of each command
267 267 # as it is invoked to ensure there is no privilege escalation.
268 268 # TODO consider allowing multiple commands to regular command URLs
269 269 # iff each command is the same.
270 270
271 271 proto = httpv2protocolhandler(req, ui, args=command['args'])
272 272
273 273 if reqcommand == b'multirequest':
274 274 if not COMMANDS.commandavailable(command['command'], proto):
275 275 # TODO proper error mechanism
276 276 res.status = b'200 OK'
277 277 res.headers[b'Content-Type'] = b'text/plain'
278 278 res.setbodybytes(_('wire protocol command not available: %s') %
279 279 command['command'])
280 280 return True
281 281
282 282 # TODO don't use assert here, since it may be elided by -O.
283 283 assert authedperm in (b'ro', b'rw')
284 284 wirecommand = COMMANDS[command['command']]
285 285 assert wirecommand.permission in ('push', 'pull')
286 286
287 287 if authedperm == b'ro' and wirecommand.permission != 'pull':
288 288 # TODO proper error mechanism
289 289 res.status = b'403 Forbidden'
290 290 res.headers[b'Content-Type'] = b'text/plain'
291 291 res.setbodybytes(_('insufficient permissions to execute '
292 292 'command: %s') % command['command'])
293 293 return True
294 294
295 295 # TODO should we also call checkperm() here? Maybe not if we're going
296 296 # to overhaul that API. The granted scope from the URL check should
297 297 # be good enough.
298 298
299 299 else:
300 300 # Don't allow multiple commands outside of ``multirequest`` URL.
301 301 if issubsequent:
302 302 # TODO proper error mechanism
303 303 res.status = b'200 OK'
304 304 res.headers[b'Content-Type'] = b'text/plain'
305 305 res.setbodybytes(_('multiple commands cannot be issued to this '
306 306 'URL'))
307 307 return True
308 308
309 309 if reqcommand != command['command']:
310 310 # TODO define proper error mechanism
311 311 res.status = b'200 OK'
312 312 res.headers[b'Content-Type'] = b'text/plain'
313 313 res.setbodybytes(_('command in frame must match command in URL'))
314 314 return True
315 315
316 316 res.status = b'200 OK'
317 317 res.headers[b'Content-Type'] = FRAMINGTYPE
318 318
319 319 try:
320 320 objs = dispatch(repo, proto, command['command'], command['redirect'])
321 321
322 322 action, meta = reactor.oncommandresponsereadyobjects(
323 323 outstream, command['requestid'], objs)
324 324
325 325 except error.WireprotoCommandError as e:
326 326 action, meta = reactor.oncommanderror(
327 327 outstream, command['requestid'], e.message, e.messageargs)
328 328
329 329 except Exception as e:
330 330 action, meta = reactor.onservererror(
331 331 outstream, command['requestid'],
332 332 _('exception when invoking command: %s') %
333 333 stringutil.forcebytestr(e))
334 334
335 335 if action == 'sendframes':
336 336 res.setbodygen(meta['framegen'])
337 337 return True
338 338 elif action == 'noop':
339 339 return False
340 340 else:
341 341 raise error.ProgrammingError('unhandled event from reactor: %s' %
342 342 action)
343 343
344 344 def getdispatchrepo(repo, proto, command):
345 345 viewconfig = repo.ui.config('server', 'view')
346 346 return repo.filtered(viewconfig)
347 347
348 348 def dispatch(repo, proto, command, redirect):
349 349 """Run a wire protocol command.
350 350
351 351 Returns an iterable of objects that will be sent to the client.
352 352 """
353 353 repo = getdispatchrepo(repo, proto, command)
354 354
355 355 entry = COMMANDS[command]
356 356 func = entry.func
357 357 spec = entry.args
358 358
359 359 args = proto.getargs(spec)
360 360
361 361 # There is some duplicate boilerplate code here for calling the command and
362 362 # emitting objects. It is either that or a lot of indented code that looks
363 363 # like a pyramid (since there are a lot of code paths that result in not
364 364 # using the cacher).
365 365 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
366 366
367 367 # Request is not cacheable. Don't bother instantiating a cacher.
368 368 if not entry.cachekeyfn:
369 369 for o in callcommand():
370 370 yield o
371 371 return
372 372
373 373 if redirect:
374 374 redirecttargets = redirect[b'targets']
375 375 redirecthashes = redirect[b'hashes']
376 376 else:
377 377 redirecttargets = []
378 378 redirecthashes = []
379 379
380 380 cacher = makeresponsecacher(repo, proto, command, args,
381 381 cborutil.streamencode,
382 382 redirecttargets=redirecttargets,
383 383 redirecthashes=redirecthashes)
384 384
385 385 # But we have no cacher. Do default handling.
386 386 if not cacher:
387 387 for o in callcommand():
388 388 yield o
389 389 return
390 390
391 391 with cacher:
392 392 cachekey = entry.cachekeyfn(repo, proto, cacher,
393 393 **pycompat.strkwargs(args))
394 394
395 395 # No cache key or the cacher doesn't like it. Do default handling.
396 396 if cachekey is None or not cacher.setcachekey(cachekey):
397 397 for o in callcommand():
398 398 yield o
399 399 return
400 400
401 401 # Serve it from the cache, if possible.
402 402 cached = cacher.lookup()
403 403
404 404 if cached:
405 405 for o in cached['objs']:
406 406 yield o
407 407 return
408 408
409 409 # Else call the command and feed its output into the cacher, allowing
410 410 # the cacher to buffer/mutate objects as it desires.
411 411 for o in callcommand():
412 412 for o in cacher.onobject(o):
413 413 yield o
414 414
415 415 for o in cacher.onfinished():
416 416 yield o
417 417
418 418 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
419 419 class httpv2protocolhandler(object):
420 420 def __init__(self, req, ui, args=None):
421 421 self._req = req
422 422 self._ui = ui
423 423 self._args = args
424 424
425 425 @property
426 426 def name(self):
427 427 return HTTP_WIREPROTO_V2
428 428
429 429 def getargs(self, args):
430 430 # First look for args that were passed but aren't registered on this
431 431 # command.
432 432 extra = set(self._args) - set(args)
433 433 if extra:
434 434 raise error.WireprotoCommandError(
435 435 'unsupported argument to command: %s' %
436 436 ', '.join(sorted(extra)))
437 437
438 438 # And look for required arguments that are missing.
439 439 missing = {a for a in args if args[a]['required']} - set(self._args)
440 440
441 441 if missing:
442 442 raise error.WireprotoCommandError(
443 443 'missing required arguments: %s' % ', '.join(sorted(missing)))
444 444
445 445 # Now derive the arguments to pass to the command, taking into
446 446 # account the arguments specified by the client.
447 447 data = {}
448 448 for k, meta in sorted(args.items()):
449 449 # This argument wasn't passed by the client.
450 450 if k not in self._args:
451 451 data[k] = meta['default']()
452 452 continue
453 453
454 454 v = self._args[k]
455 455
456 456 # Sets may be expressed as lists. Silently normalize.
457 457 if meta['type'] == 'set' and isinstance(v, list):
458 458 v = set(v)
459 459
460 460 # TODO consider more/stronger type validation.
461 461
462 462 data[k] = v
463 463
464 464 return data
465 465
466 466 def getprotocaps(self):
467 467 # Protocol capabilities are currently not implemented for HTTP V2.
468 468 return set()
469 469
470 470 def getpayload(self):
471 471 raise NotImplementedError
472 472
473 473 @contextlib.contextmanager
474 474 def mayberedirectstdio(self):
475 475 raise NotImplementedError
476 476
477 477 def client(self):
478 478 raise NotImplementedError
479 479
480 480 def addcapabilities(self, repo, caps):
481 481 return caps
482 482
483 483 def checkperm(self, perm):
484 484 raise NotImplementedError
485 485
486 486 def httpv2apidescriptor(req, repo):
487 487 proto = httpv2protocolhandler(req, repo.ui)
488 488
489 489 return _capabilitiesv2(repo, proto)
490 490
491 491 def _capabilitiesv2(repo, proto):
492 492 """Obtain the set of capabilities for version 2 transports.
493 493
494 494 These capabilities are distinct from the capabilities for version 1
495 495 transports.
496 496 """
497 497 caps = {
498 498 'commands': {},
499 499 'framingmediatypes': [FRAMINGTYPE],
500 500 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
501 501 }
502 502
503 503 for command, entry in COMMANDS.items():
504 504 args = {}
505 505
506 506 for arg, meta in entry.args.items():
507 507 args[arg] = {
508 508 # TODO should this be a normalized type using CBOR's
509 509 # terminology?
510 510 b'type': meta['type'],
511 511 b'required': meta['required'],
512 512 }
513 513
514 514 if not meta['required']:
515 515 args[arg][b'default'] = meta['default']()
516 516
517 517 if meta['validvalues']:
518 518 args[arg][b'validvalues'] = meta['validvalues']
519 519
520 520 # TODO this type of check should be defined in a per-command callback.
521 521 if (command == b'rawstorefiledata'
522 522 and not streamclone.allowservergeneration(repo)):
523 523 continue
524 524
525 525 caps['commands'][command] = {
526 526 'args': args,
527 527 'permissions': [entry.permission],
528 528 }
529 529
530 530 if entry.extracapabilitiesfn:
531 531 extracaps = entry.extracapabilitiesfn(repo, proto)
532 532 caps['commands'][command].update(extracaps)
533 533
534 534 caps['rawrepoformats'] = sorted(repo.requirements &
535 535 repo.supportedformats)
536 536
537 537 targets = getadvertisedredirecttargets(repo, proto)
538 538 if targets:
539 539 caps[b'redirect'] = {
540 540 b'targets': [],
541 541 b'hashes': [b'sha256', b'sha1'],
542 542 }
543 543
544 544 for target in targets:
545 545 entry = {
546 546 b'name': target['name'],
547 547 b'protocol': target['protocol'],
548 548 b'uris': target['uris'],
549 549 }
550 550
551 551 for key in ('snirequired', 'tlsversions'):
552 552 if key in target:
553 553 entry[key] = target[key]
554 554
555 555 caps[b'redirect'][b'targets'].append(entry)
556 556
557 557 return proto.addcapabilities(repo, caps)
558 558
559 559 def getadvertisedredirecttargets(repo, proto):
560 560 """Obtain a list of content redirect targets.
561 561
562 562 Returns a list containing potential redirect targets that will be
563 563 advertised in capabilities data. Each dict MUST have the following
564 564 keys:
565 565
566 566 name
567 567 The name of this redirect target. This is the identifier clients use
568 568 to refer to a target. It is transferred as part of every command
569 569 request.
570 570
571 571 protocol
572 572 Network protocol used by this target. Typically this is the string
573 573 in front of the ``://`` in a URL. e.g. ``https``.
574 574
575 575 uris
576 576 List of representative URIs for this target. Clients can use the
577 577 URIs to test parsing for compatibility or for ordering preference
578 578 for which target to use.
579 579
580 580 The following optional keys are recognized:
581 581
582 582 snirequired
583 583 Bool indicating if Server Name Indication (SNI) is required to
584 584 connect to this target.
585 585
586 586 tlsversions
587 587 List of bytes indicating which TLS versions are supported by this
588 588 target.
589 589
590 590 By default, clients reflect the target order advertised by servers
591 591 and servers will use the first client-advertised target when picking
592 592 a redirect target. So targets should be advertised in the order the
593 593 server prefers they be used.
594 594 """
595 595 return []
596 596
597 597 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None,
598 598 extracapabilitiesfn=None):
599 599 """Decorator to declare a wire protocol command.
600 600
601 601 ``name`` is the name of the wire protocol command being provided.
602 602
603 603 ``args`` is a dict defining arguments accepted by the command. Keys are
604 604 the argument name. Values are dicts with the following keys:
605 605
606 606 ``type``
607 607 The argument data type. Must be one of the following string
608 608 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
609 609 or ``bool``.
610 610
611 611 ``default``
612 612 A callable returning the default value for this argument. If not
613 613 specified, ``None`` will be the default value.
614 614
615 615 ``example``
616 616 An example value for this argument.
617 617
618 618 ``validvalues``
619 619 Set of recognized values for this argument.
620 620
621 621 ``permission`` defines the permission type needed to run this command.
622 622 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
623 623 respectively. Default is to assume command requires ``push`` permissions
624 624 because otherwise commands not declaring their permissions could modify
625 625 a repository that is supposed to be read-only.
626 626
627 627 ``cachekeyfn`` defines an optional callable that can derive the
628 628 cache key for this request.
629 629
630 630 ``extracapabilitiesfn`` defines an optional callable that defines extra
631 631 command capabilities/parameters that are advertised next to the command
632 632 in the capabilities data structure describing the server. The callable
633 633 receives as arguments the repository and protocol objects. It returns
634 634 a dict of extra fields to add to the command descriptor.
635 635
636 636 Wire protocol commands are generators of objects to be serialized and
637 637 sent to the client.
638 638
639 639 If a command raises an uncaught exception, this will be translated into
640 640 a command error.
641 641
642 642 All commands can opt in to being cacheable by defining a function
643 643 (``cachekeyfn``) that is called to derive a cache key. This function
644 644 receives the same arguments as the command itself plus a ``cacher``
645 645 argument containing the active cacher for the request and returns a bytes
646 646 containing the key in a cache the response to this command may be cached
647 647 under.
648 648 """
649 649 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
650 650 if v['version'] == 2}
651 651
652 652 if permission not in ('push', 'pull'):
653 653 raise error.ProgrammingError('invalid wire protocol permission; '
654 654 'got %s; expected "push" or "pull"' %
655 655 permission)
656 656
657 657 if args is None:
658 658 args = {}
659 659
660 660 if not isinstance(args, dict):
661 661 raise error.ProgrammingError('arguments for version 2 commands '
662 662 'must be declared as dicts')
663 663
664 664 for arg, meta in args.items():
665 665 if arg == '*':
666 666 raise error.ProgrammingError('* argument name not allowed on '
667 667 'version 2 commands')
668 668
669 669 if not isinstance(meta, dict):
670 670 raise error.ProgrammingError('arguments for version 2 commands '
671 671 'must declare metadata as a dict')
672 672
673 673 if 'type' not in meta:
674 674 raise error.ProgrammingError('%s argument for command %s does not '
675 675 'declare type field' % (arg, name))
676 676
677 677 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
678 678 raise error.ProgrammingError('%s argument for command %s has '
679 679 'illegal type: %s' % (arg, name,
680 680 meta['type']))
681 681
682 682 if 'example' not in meta:
683 683 raise error.ProgrammingError('%s argument for command %s does not '
684 684 'declare example field' % (arg, name))
685 685
686 686 meta['required'] = 'default' not in meta
687 687
688 688 meta.setdefault('default', lambda: None)
689 689 meta.setdefault('validvalues', None)
690 690
691 691 def register(func):
692 692 if name in COMMANDS:
693 693 raise error.ProgrammingError('%s command already registered '
694 694 'for version 2' % name)
695 695
696 696 COMMANDS[name] = wireprototypes.commandentry(
697 697 func, args=args, transports=transports, permission=permission,
698 698 cachekeyfn=cachekeyfn, extracapabilitiesfn=extracapabilitiesfn)
699 699
700 700 return func
701 701
702 702 return register
703 703
704 704 def makecommandcachekeyfn(command, localversion=None, allargs=False):
705 705 """Construct a cache key derivation function with common features.
706 706
707 707 By default, the cache key is a hash of:
708 708
709 709 * The command name.
710 710 * A global cache version number.
711 711 * A local cache version number (passed via ``localversion``).
712 712 * All the arguments passed to the command.
713 713 * The media type used.
714 714 * Wire protocol version string.
715 715 * The repository path.
716 716 """
717 717 if not allargs:
718 718 raise error.ProgrammingError('only allargs=True is currently supported')
719 719
720 720 if localversion is None:
721 721 raise error.ProgrammingError('must set localversion argument value')
722 722
723 723 def cachekeyfn(repo, proto, cacher, **args):
724 724 spec = COMMANDS[command]
725 725
726 726 # Commands that mutate the repo can not be cached.
727 727 if spec.permission == 'push':
728 728 return None
729 729
730 730 # TODO config option to disable caching.
731 731
732 732 # Our key derivation strategy is to construct a data structure
733 733 # holding everything that could influence cacheability and to hash
734 734 # the CBOR representation of that. Using CBOR seems like it might
735 735 # be overkill. However, simpler hashing mechanisms are prone to
736 736 # duplicate input issues. e.g. if you just concatenate two values,
737 737 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
738 738 # "padding" between values and prevents these problems.
739 739
740 740 # Seed the hash with various data.
741 741 state = {
742 742 # To invalidate all cache keys.
743 743 b'globalversion': GLOBAL_CACHE_VERSION,
744 744 # More granular cache key invalidation.
745 745 b'localversion': localversion,
746 746 # Cache keys are segmented by command.
747 747 b'command': command,
748 748 # Throw in the media type and API version strings so changes
749 749 # to exchange semantics invalid cache.
750 750 b'mediatype': FRAMINGTYPE,
751 751 b'version': HTTP_WIREPROTO_V2,
752 752 # So same requests for different repos don't share cache keys.
753 753 b'repo': repo.root,
754 754 }
755 755
756 756 # The arguments passed to us will have already been normalized.
757 757 # Default values will be set, etc. This is important because it
758 758 # means that it doesn't matter if clients send an explicit argument
759 759 # or rely on the default value: it will all normalize to the same
760 760 # set of arguments on the server and therefore the same cache key.
761 761 #
762 762 # Arguments by their very nature must support being encoded to CBOR.
763 763 # And the CBOR encoder is deterministic. So we hash the arguments
764 764 # by feeding the CBOR of their representation into the hasher.
765 765 if allargs:
766 766 state[b'args'] = pycompat.byteskwargs(args)
767 767
768 768 cacher.adjustcachekeystate(state)
769 769
770 770 hasher = hashlib.sha1()
771 771 for chunk in cborutil.streamencode(state):
772 772 hasher.update(chunk)
773 773
774 774 return pycompat.sysbytes(hasher.hexdigest())
775 775
776 776 return cachekeyfn
777 777
778 778 def makeresponsecacher(repo, proto, command, args, objencoderfn,
779 779 redirecttargets, redirecthashes):
780 780 """Construct a cacher for a cacheable command.
781 781
782 782 Returns an ``iwireprotocolcommandcacher`` instance.
783 783
784 784 Extensions can monkeypatch this function to provide custom caching
785 785 backends.
786 786 """
787 787 return None
788 788
789 789 def resolvenodes(repo, revisions):
790 790 """Resolve nodes from a revisions specifier data structure."""
791 791 cl = repo.changelog
792 792 clhasnode = cl.hasnode
793 793
794 794 seen = set()
795 795 nodes = []
796 796
797 797 if not isinstance(revisions, list):
798 798 raise error.WireprotoCommandError('revisions must be defined as an '
799 799 'array')
800 800
801 801 for spec in revisions:
802 802 if b'type' not in spec:
803 803 raise error.WireprotoCommandError(
804 804 'type key not present in revision specifier')
805 805
806 806 typ = spec[b'type']
807 807
808 808 if typ == b'changesetexplicit':
809 809 if b'nodes' not in spec:
810 810 raise error.WireprotoCommandError(
811 811 'nodes key not present in changesetexplicit revision '
812 812 'specifier')
813 813
814 814 for node in spec[b'nodes']:
815 815 if node not in seen:
816 816 nodes.append(node)
817 817 seen.add(node)
818 818
819 819 elif typ == b'changesetexplicitdepth':
820 820 for key in (b'nodes', b'depth'):
821 821 if key not in spec:
822 822 raise error.WireprotoCommandError(
823 823 '%s key not present in changesetexplicitdepth revision '
824 824 'specifier', (key,))
825 825
826 826 for rev in repo.revs(b'ancestors(%ln, %s)', spec[b'nodes'],
827 827 spec[b'depth'] - 1):
828 828 node = cl.node(rev)
829 829
830 830 if node not in seen:
831 831 nodes.append(node)
832 832 seen.add(node)
833 833
834 834 elif typ == b'changesetdagrange':
835 835 for key in (b'roots', b'heads'):
836 836 if key not in spec:
837 837 raise error.WireprotoCommandError(
838 838 '%s key not present in changesetdagrange revision '
839 839 'specifier', (key,))
840 840
841 841 if not spec[b'heads']:
842 842 raise error.WireprotoCommandError(
843 843 'heads key in changesetdagrange cannot be empty')
844 844
845 845 if spec[b'roots']:
846 846 common = [n for n in spec[b'roots'] if clhasnode(n)]
847 847 else:
848 848 common = [nullid]
849 849
850 850 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
851 851 if n not in seen:
852 852 nodes.append(n)
853 853 seen.add(n)
854 854
855 855 else:
856 856 raise error.WireprotoCommandError(
857 857 'unknown revision specifier type: %s', (typ,))
858 858
859 859 return nodes
860 860
861 861 @wireprotocommand('branchmap', permission='pull')
862 862 def branchmapv2(repo, proto):
863 863 yield {encoding.fromlocal(k): v
864 864 for k, v in repo.branchmap().iteritems()}
865 865
866 866 @wireprotocommand('capabilities', permission='pull')
867 867 def capabilitiesv2(repo, proto):
868 868 yield _capabilitiesv2(repo, proto)
869 869
870 870 @wireprotocommand(
871 871 'changesetdata',
872 872 args={
873 873 'revisions': {
874 874 'type': 'list',
875 875 'example': [{
876 876 b'type': b'changesetexplicit',
877 877 b'nodes': [b'abcdef...'],
878 878 }],
879 879 },
880 880 'fields': {
881 881 'type': 'set',
882 882 'default': set,
883 883 'example': {b'parents', b'revision'},
884 884 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
885 885 },
886 886 },
887 887 permission='pull')
888 888 def changesetdata(repo, proto, revisions, fields):
889 889 # TODO look for unknown fields and abort when they can't be serviced.
890 890 # This could probably be validated by dispatcher using validvalues.
891 891
892 892 cl = repo.changelog
893 893 outgoing = resolvenodes(repo, revisions)
894 894 publishing = repo.publishing()
895 895
896 896 if outgoing:
897 897 repo.hook('preoutgoing', throw=True, source='serve')
898 898
899 899 yield {
900 900 b'totalitems': len(outgoing),
901 901 }
902 902
903 903 # The phases of nodes already transferred to the client may have changed
904 904 # since the client last requested data. We send phase-only records
905 905 # for these revisions, if requested.
906 906 # TODO actually do this. We'll probably want to emit phase heads
907 907 # in the ancestry set of the outgoing revisions. This will ensure
908 908 # that phase updates within that set are seen.
909 909 if b'phase' in fields:
910 910 pass
911 911
912 912 nodebookmarks = {}
913 913 for mark, node in repo._bookmarks.items():
914 914 nodebookmarks.setdefault(node, set()).add(mark)
915 915
916 916 # It is already topologically sorted by revision number.
917 917 for node in outgoing:
918 918 d = {
919 919 b'node': node,
920 920 }
921 921
922 922 if b'parents' in fields:
923 923 d[b'parents'] = cl.parents(node)
924 924
925 925 if b'phase' in fields:
926 926 if publishing:
927 927 d[b'phase'] = b'public'
928 928 else:
929 929 ctx = repo[node]
930 930 d[b'phase'] = ctx.phasestr()
931 931
932 932 if b'bookmarks' in fields and node in nodebookmarks:
933 933 d[b'bookmarks'] = sorted(nodebookmarks[node])
934 934 del nodebookmarks[node]
935 935
936 936 followingmeta = []
937 937 followingdata = []
938 938
939 939 if b'revision' in fields:
940 revisiondata = cl.revision(node, raw=True)
940 revisiondata = cl.rawdata(node)
941 941 followingmeta.append((b'revision', len(revisiondata)))
942 942 followingdata.append(revisiondata)
943 943
944 944 # TODO make it possible for extensions to wrap a function or register
945 945 # a handler to service custom fields.
946 946
947 947 if followingmeta:
948 948 d[b'fieldsfollowing'] = followingmeta
949 949
950 950 yield d
951 951
952 952 for extra in followingdata:
953 953 yield extra
954 954
955 955 # If requested, send bookmarks from nodes that didn't have revision
956 956 # data sent so receiver is aware of any bookmark updates.
957 957 if b'bookmarks' in fields:
958 958 for node, marks in sorted(nodebookmarks.iteritems()):
959 959 yield {
960 960 b'node': node,
961 961 b'bookmarks': sorted(marks),
962 962 }
963 963
964 964 class FileAccessError(Exception):
965 965 """Represents an error accessing a specific file."""
966 966
967 967 def __init__(self, path, msg, args):
968 968 self.path = path
969 969 self.msg = msg
970 970 self.args = args
971 971
972 972 def getfilestore(repo, proto, path):
973 973 """Obtain a file storage object for use with wire protocol.
974 974
975 975 Exists as a standalone function so extensions can monkeypatch to add
976 976 access control.
977 977 """
978 978 # This seems to work even if the file doesn't exist. So catch
979 979 # "empty" files and return an error.
980 980 fl = repo.file(path)
981 981
982 982 if not len(fl):
983 983 raise FileAccessError(path, 'unknown file: %s', (path,))
984 984
985 985 return fl
986 986
987 987 def emitfilerevisions(repo, path, revisions, linknodes, fields):
988 988 for revision in revisions:
989 989 d = {
990 990 b'node': revision.node,
991 991 }
992 992
993 993 if b'parents' in fields:
994 994 d[b'parents'] = [revision.p1node, revision.p2node]
995 995
996 996 if b'linknode' in fields:
997 997 d[b'linknode'] = linknodes[revision.node]
998 998
999 999 followingmeta = []
1000 1000 followingdata = []
1001 1001
1002 1002 if b'revision' in fields:
1003 1003 if revision.revision is not None:
1004 1004 followingmeta.append((b'revision', len(revision.revision)))
1005 1005 followingdata.append(revision.revision)
1006 1006 else:
1007 1007 d[b'deltabasenode'] = revision.basenode
1008 1008 followingmeta.append((b'delta', len(revision.delta)))
1009 1009 followingdata.append(revision.delta)
1010 1010
1011 1011 if followingmeta:
1012 1012 d[b'fieldsfollowing'] = followingmeta
1013 1013
1014 1014 yield d
1015 1015
1016 1016 for extra in followingdata:
1017 1017 yield extra
1018 1018
1019 1019 def makefilematcher(repo, pathfilter):
1020 1020 """Construct a matcher from a path filter dict."""
1021 1021
1022 1022 # Validate values.
1023 1023 if pathfilter:
1024 1024 for key in (b'include', b'exclude'):
1025 1025 for pattern in pathfilter.get(key, []):
1026 1026 if not pattern.startswith((b'path:', b'rootfilesin:')):
1027 1027 raise error.WireprotoCommandError(
1028 1028 '%s pattern must begin with `path:` or `rootfilesin:`; '
1029 1029 'got %s', (key, pattern))
1030 1030
1031 1031 if pathfilter:
1032 1032 matcher = matchmod.match(repo.root, b'',
1033 1033 include=pathfilter.get(b'include', []),
1034 1034 exclude=pathfilter.get(b'exclude', []))
1035 1035 else:
1036 1036 matcher = matchmod.match(repo.root, b'')
1037 1037
1038 1038 # Requested patterns could include files not in the local store. So
1039 1039 # filter those out.
1040 1040 return repo.narrowmatch(matcher)
1041 1041
1042 1042 @wireprotocommand(
1043 1043 'filedata',
1044 1044 args={
1045 1045 'haveparents': {
1046 1046 'type': 'bool',
1047 1047 'default': lambda: False,
1048 1048 'example': True,
1049 1049 },
1050 1050 'nodes': {
1051 1051 'type': 'list',
1052 1052 'example': [b'0123456...'],
1053 1053 },
1054 1054 'fields': {
1055 1055 'type': 'set',
1056 1056 'default': set,
1057 1057 'example': {b'parents', b'revision'},
1058 1058 'validvalues': {b'parents', b'revision', b'linknode'},
1059 1059 },
1060 1060 'path': {
1061 1061 'type': 'bytes',
1062 1062 'example': b'foo.txt',
1063 1063 }
1064 1064 },
1065 1065 permission='pull',
1066 1066 # TODO censoring a file revision won't invalidate the cache.
1067 1067 # Figure out a way to take censoring into account when deriving
1068 1068 # the cache key.
1069 1069 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
1070 1070 def filedata(repo, proto, haveparents, nodes, fields, path):
1071 1071 # TODO this API allows access to file revisions that are attached to
1072 1072 # secret changesets. filesdata does not have this problem. Maybe this
1073 1073 # API should be deleted?
1074 1074
1075 1075 try:
1076 1076 # Extensions may wish to access the protocol handler.
1077 1077 store = getfilestore(repo, proto, path)
1078 1078 except FileAccessError as e:
1079 1079 raise error.WireprotoCommandError(e.msg, e.args)
1080 1080
1081 1081 clnode = repo.changelog.node
1082 1082 linknodes = {}
1083 1083
1084 1084 # Validate requested nodes.
1085 1085 for node in nodes:
1086 1086 try:
1087 1087 store.rev(node)
1088 1088 except error.LookupError:
1089 1089 raise error.WireprotoCommandError('unknown file node: %s',
1090 1090 (hex(node),))
1091 1091
1092 1092 # TODO by creating the filectx against a specific file revision
1093 1093 # instead of changeset, linkrev() is always used. This is wrong for
1094 1094 # cases where linkrev() may refer to a hidden changeset. But since this
1095 1095 # API doesn't know anything about changesets, we're not sure how to
1096 1096 # disambiguate the linknode. Perhaps we should delete this API?
1097 1097 fctx = repo.filectx(path, fileid=node)
1098 1098 linknodes[node] = clnode(fctx.introrev())
1099 1099
1100 1100 revisions = store.emitrevisions(nodes,
1101 1101 revisiondata=b'revision' in fields,
1102 1102 assumehaveparentrevisions=haveparents)
1103 1103
1104 1104 yield {
1105 1105 b'totalitems': len(nodes),
1106 1106 }
1107 1107
1108 1108 for o in emitfilerevisions(repo, path, revisions, linknodes, fields):
1109 1109 yield o
1110 1110
1111 1111 def filesdatacapabilities(repo, proto):
1112 1112 batchsize = repo.ui.configint(
1113 1113 b'experimental', b'server.filesdata.recommended-batch-size')
1114 1114 return {
1115 1115 b'recommendedbatchsize': batchsize,
1116 1116 }
1117 1117
1118 1118 @wireprotocommand(
1119 1119 'filesdata',
1120 1120 args={
1121 1121 'haveparents': {
1122 1122 'type': 'bool',
1123 1123 'default': lambda: False,
1124 1124 'example': True,
1125 1125 },
1126 1126 'fields': {
1127 1127 'type': 'set',
1128 1128 'default': set,
1129 1129 'example': {b'parents', b'revision'},
1130 1130 'validvalues': {b'firstchangeset', b'linknode', b'parents',
1131 1131 b'revision'},
1132 1132 },
1133 1133 'pathfilter': {
1134 1134 'type': 'dict',
1135 1135 'default': lambda: None,
1136 1136 'example': {b'include': [b'path:tests']},
1137 1137 },
1138 1138 'revisions': {
1139 1139 'type': 'list',
1140 1140 'example': [{
1141 1141 b'type': b'changesetexplicit',
1142 1142 b'nodes': [b'abcdef...'],
1143 1143 }],
1144 1144 },
1145 1145 },
1146 1146 permission='pull',
1147 1147 # TODO censoring a file revision won't invalidate the cache.
1148 1148 # Figure out a way to take censoring into account when deriving
1149 1149 # the cache key.
1150 1150 cachekeyfn=makecommandcachekeyfn('filesdata', 1, allargs=True),
1151 1151 extracapabilitiesfn=filesdatacapabilities)
1152 1152 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1153 1153 # TODO This should operate on a repo that exposes obsolete changesets. There
1154 1154 # is a race between a client making a push that obsoletes a changeset and
1155 1155 # another client fetching files data for that changeset. If a client has a
1156 1156 # changeset, it should probably be allowed to access files data for that
1157 1157 # changeset.
1158 1158
1159 1159 outgoing = resolvenodes(repo, revisions)
1160 1160 filematcher = makefilematcher(repo, pathfilter)
1161 1161
1162 1162 # path -> {fnode: linknode}
1163 1163 fnodes = collections.defaultdict(dict)
1164 1164
1165 1165 # We collect the set of relevant file revisions by iterating the changeset
1166 1166 # revisions and either walking the set of files recorded in the changeset
1167 1167 # or by walking the manifest at that revision. There is probably room for a
1168 1168 # storage-level API to request this data, as it can be expensive to compute
1169 1169 # and would benefit from caching or alternate storage from what revlogs
1170 1170 # provide.
1171 1171 for node in outgoing:
1172 1172 ctx = repo[node]
1173 1173 mctx = ctx.manifestctx()
1174 1174 md = mctx.read()
1175 1175
1176 1176 if haveparents:
1177 1177 checkpaths = ctx.files()
1178 1178 else:
1179 1179 checkpaths = md.keys()
1180 1180
1181 1181 for path in checkpaths:
1182 1182 fnode = md[path]
1183 1183
1184 1184 if path in fnodes and fnode in fnodes[path]:
1185 1185 continue
1186 1186
1187 1187 if not filematcher(path):
1188 1188 continue
1189 1189
1190 1190 fnodes[path].setdefault(fnode, node)
1191 1191
1192 1192 yield {
1193 1193 b'totalpaths': len(fnodes),
1194 1194 b'totalitems': sum(len(v) for v in fnodes.values())
1195 1195 }
1196 1196
1197 1197 for path, filenodes in sorted(fnodes.items()):
1198 1198 try:
1199 1199 store = getfilestore(repo, proto, path)
1200 1200 except FileAccessError as e:
1201 1201 raise error.WireprotoCommandError(e.msg, e.args)
1202 1202
1203 1203 yield {
1204 1204 b'path': path,
1205 1205 b'totalitems': len(filenodes),
1206 1206 }
1207 1207
1208 1208 revisions = store.emitrevisions(filenodes.keys(),
1209 1209 revisiondata=b'revision' in fields,
1210 1210 assumehaveparentrevisions=haveparents)
1211 1211
1212 1212 for o in emitfilerevisions(repo, path, revisions, filenodes, fields):
1213 1213 yield o
1214 1214
1215 1215 @wireprotocommand(
1216 1216 'heads',
1217 1217 args={
1218 1218 'publiconly': {
1219 1219 'type': 'bool',
1220 1220 'default': lambda: False,
1221 1221 'example': False,
1222 1222 },
1223 1223 },
1224 1224 permission='pull')
1225 1225 def headsv2(repo, proto, publiconly):
1226 1226 if publiconly:
1227 1227 repo = repo.filtered('immutable')
1228 1228
1229 1229 yield repo.heads()
1230 1230
1231 1231 @wireprotocommand(
1232 1232 'known',
1233 1233 args={
1234 1234 'nodes': {
1235 1235 'type': 'list',
1236 1236 'default': list,
1237 1237 'example': [b'deadbeef'],
1238 1238 },
1239 1239 },
1240 1240 permission='pull')
1241 1241 def knownv2(repo, proto, nodes):
1242 1242 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1243 1243 yield result
1244 1244
1245 1245 @wireprotocommand(
1246 1246 'listkeys',
1247 1247 args={
1248 1248 'namespace': {
1249 1249 'type': 'bytes',
1250 1250 'example': b'ns',
1251 1251 },
1252 1252 },
1253 1253 permission='pull')
1254 1254 def listkeysv2(repo, proto, namespace):
1255 1255 keys = repo.listkeys(encoding.tolocal(namespace))
1256 1256 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1257 1257 for k, v in keys.iteritems()}
1258 1258
1259 1259 yield keys
1260 1260
1261 1261 @wireprotocommand(
1262 1262 'lookup',
1263 1263 args={
1264 1264 'key': {
1265 1265 'type': 'bytes',
1266 1266 'example': b'foo',
1267 1267 },
1268 1268 },
1269 1269 permission='pull')
1270 1270 def lookupv2(repo, proto, key):
1271 1271 key = encoding.tolocal(key)
1272 1272
1273 1273 # TODO handle exception.
1274 1274 node = repo.lookup(key)
1275 1275
1276 1276 yield node
1277 1277
1278 1278 def manifestdatacapabilities(repo, proto):
1279 1279 batchsize = repo.ui.configint(
1280 1280 b'experimental', b'server.manifestdata.recommended-batch-size')
1281 1281
1282 1282 return {
1283 1283 b'recommendedbatchsize': batchsize,
1284 1284 }
1285 1285
1286 1286 @wireprotocommand(
1287 1287 'manifestdata',
1288 1288 args={
1289 1289 'nodes': {
1290 1290 'type': 'list',
1291 1291 'example': [b'0123456...'],
1292 1292 },
1293 1293 'haveparents': {
1294 1294 'type': 'bool',
1295 1295 'default': lambda: False,
1296 1296 'example': True,
1297 1297 },
1298 1298 'fields': {
1299 1299 'type': 'set',
1300 1300 'default': set,
1301 1301 'example': {b'parents', b'revision'},
1302 1302 'validvalues': {b'parents', b'revision'},
1303 1303 },
1304 1304 'tree': {
1305 1305 'type': 'bytes',
1306 1306 'example': b'',
1307 1307 },
1308 1308 },
1309 1309 permission='pull',
1310 1310 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True),
1311 1311 extracapabilitiesfn=manifestdatacapabilities)
1312 1312 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1313 1313 store = repo.manifestlog.getstorage(tree)
1314 1314
1315 1315 # Validate the node is known and abort on unknown revisions.
1316 1316 for node in nodes:
1317 1317 try:
1318 1318 store.rev(node)
1319 1319 except error.LookupError:
1320 1320 raise error.WireprotoCommandError(
1321 1321 'unknown node: %s', (node,))
1322 1322
1323 1323 revisions = store.emitrevisions(nodes,
1324 1324 revisiondata=b'revision' in fields,
1325 1325 assumehaveparentrevisions=haveparents)
1326 1326
1327 1327 yield {
1328 1328 b'totalitems': len(nodes),
1329 1329 }
1330 1330
1331 1331 for revision in revisions:
1332 1332 d = {
1333 1333 b'node': revision.node,
1334 1334 }
1335 1335
1336 1336 if b'parents' in fields:
1337 1337 d[b'parents'] = [revision.p1node, revision.p2node]
1338 1338
1339 1339 followingmeta = []
1340 1340 followingdata = []
1341 1341
1342 1342 if b'revision' in fields:
1343 1343 if revision.revision is not None:
1344 1344 followingmeta.append((b'revision', len(revision.revision)))
1345 1345 followingdata.append(revision.revision)
1346 1346 else:
1347 1347 d[b'deltabasenode'] = revision.basenode
1348 1348 followingmeta.append((b'delta', len(revision.delta)))
1349 1349 followingdata.append(revision.delta)
1350 1350
1351 1351 if followingmeta:
1352 1352 d[b'fieldsfollowing'] = followingmeta
1353 1353
1354 1354 yield d
1355 1355
1356 1356 for extra in followingdata:
1357 1357 yield extra
1358 1358
1359 1359 @wireprotocommand(
1360 1360 'pushkey',
1361 1361 args={
1362 1362 'namespace': {
1363 1363 'type': 'bytes',
1364 1364 'example': b'ns',
1365 1365 },
1366 1366 'key': {
1367 1367 'type': 'bytes',
1368 1368 'example': b'key',
1369 1369 },
1370 1370 'old': {
1371 1371 'type': 'bytes',
1372 1372 'example': b'old',
1373 1373 },
1374 1374 'new': {
1375 1375 'type': 'bytes',
1376 1376 'example': 'new',
1377 1377 },
1378 1378 },
1379 1379 permission='push')
1380 1380 def pushkeyv2(repo, proto, namespace, key, old, new):
1381 1381 # TODO handle ui output redirection
1382 1382 yield repo.pushkey(encoding.tolocal(namespace),
1383 1383 encoding.tolocal(key),
1384 1384 encoding.tolocal(old),
1385 1385 encoding.tolocal(new))
1386 1386
1387 1387
1388 1388 @wireprotocommand(
1389 1389 'rawstorefiledata',
1390 1390 args={
1391 1391 'files': {
1392 1392 'type': 'list',
1393 1393 'example': [b'changelog', b'manifestlog'],
1394 1394 },
1395 1395 'pathfilter': {
1396 1396 'type': 'list',
1397 1397 'default': lambda: None,
1398 1398 'example': {b'include': [b'path:tests']},
1399 1399 },
1400 1400 },
1401 1401 permission='pull')
1402 1402 def rawstorefiledata(repo, proto, files, pathfilter):
1403 1403 if not streamclone.allowservergeneration(repo):
1404 1404 raise error.WireprotoCommandError(b'stream clone is disabled')
1405 1405
1406 1406 # TODO support dynamically advertising what store files "sets" are
1407 1407 # available. For now, we support changelog, manifestlog, and files.
1408 1408 files = set(files)
1409 1409 allowedfiles = {b'changelog', b'manifestlog'}
1410 1410
1411 1411 unsupported = files - allowedfiles
1412 1412 if unsupported:
1413 1413 raise error.WireprotoCommandError(b'unknown file type: %s',
1414 1414 (b', '.join(sorted(unsupported)),))
1415 1415
1416 1416 with repo.lock():
1417 1417 topfiles = list(repo.store.topfiles())
1418 1418
1419 1419 sendfiles = []
1420 1420 totalsize = 0
1421 1421
1422 1422 # TODO this is a bunch of storage layer interface abstractions because
1423 1423 # it assumes revlogs.
1424 1424 for name, encodedname, size in topfiles:
1425 1425 if b'changelog' in files and name.startswith(b'00changelog'):
1426 1426 pass
1427 1427 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1428 1428 pass
1429 1429 else:
1430 1430 continue
1431 1431
1432 1432 sendfiles.append((b'store', name, size))
1433 1433 totalsize += size
1434 1434
1435 1435 yield {
1436 1436 b'filecount': len(sendfiles),
1437 1437 b'totalsize': totalsize,
1438 1438 }
1439 1439
1440 1440 for location, name, size in sendfiles:
1441 1441 yield {
1442 1442 b'location': location,
1443 1443 b'path': name,
1444 1444 b'size': size,
1445 1445 }
1446 1446
1447 1447 # We have to use a closure for this to ensure the context manager is
1448 1448 # closed only after sending the final chunk.
1449 1449 def getfiledata():
1450 1450 with repo.svfs(name, 'rb', auditpath=False) as fh:
1451 1451 for chunk in util.filechunkiter(fh, limit=size):
1452 1452 yield chunk
1453 1453
1454 1454 yield wireprototypes.indefinitebytestringresponse(
1455 1455 getfiledata())
General Comments 0
You need to be logged in to leave comments. Login now