##// END OF EJS Templates
wireprotov2server: use our JSON encoder...
Gregory Szorc -
r41448:e82288a9 default
parent child Browse files
Show More
@@ -1,1456 +1,1454
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import collections
10 10 import contextlib
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18 from . import (
19 19 discovery,
20 20 encoding,
21 21 error,
22 22 match as matchmod,
23 23 narrowspec,
24 24 pycompat,
25 25 streamclone,
26 templatefilters,
26 27 util,
27 28 wireprotoframing,
28 29 wireprototypes,
29 30 )
30 31 from .utils import (
31 32 cborutil,
32 33 interfaceutil,
33 34 stringutil,
34 35 )
35 36
36 37 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
37 38
38 39 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
39 40
40 41 COMMANDS = wireprototypes.commanddict()
41 42
42 43 # Value inserted into cache key computation function. Change the value to
43 44 # force new cache keys for every command request. This should be done when
44 45 # there is a change to how caching works, etc.
45 46 GLOBAL_CACHE_VERSION = 1
46 47
47 48 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
48 49 from .hgweb import common as hgwebcommon
49 50
50 51 # URL space looks like: <permissions>/<command>, where <permission> can
51 52 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
52 53
53 54 # Root URL does nothing meaningful... yet.
54 55 if not urlparts:
55 56 res.status = b'200 OK'
56 57 res.headers[b'Content-Type'] = b'text/plain'
57 58 res.setbodybytes(_('HTTP version 2 API handler'))
58 59 return
59 60
60 61 if len(urlparts) == 1:
61 62 res.status = b'404 Not Found'
62 63 res.headers[b'Content-Type'] = b'text/plain'
63 64 res.setbodybytes(_('do not know how to process %s\n') %
64 65 req.dispatchpath)
65 66 return
66 67
67 68 permission, command = urlparts[0:2]
68 69
69 70 if permission not in (b'ro', b'rw'):
70 71 res.status = b'404 Not Found'
71 72 res.headers[b'Content-Type'] = b'text/plain'
72 73 res.setbodybytes(_('unknown permission: %s') % permission)
73 74 return
74 75
75 76 if req.method != 'POST':
76 77 res.status = b'405 Method Not Allowed'
77 78 res.headers[b'Allow'] = b'POST'
78 79 res.setbodybytes(_('commands require POST requests'))
79 80 return
80 81
81 82 # At some point we'll want to use our own API instead of recycling the
82 83 # behavior of version 1 of the wire protocol...
83 84 # TODO return reasonable responses - not responses that overload the
84 85 # HTTP status line message for error reporting.
85 86 try:
86 87 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
87 88 except hgwebcommon.ErrorResponse as e:
88 89 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
89 90 for k, v in e.headers:
90 91 res.headers[k] = v
91 92 res.setbodybytes('permission denied')
92 93 return
93 94
94 95 # We have a special endpoint to reflect the request back at the client.
95 96 if command == b'debugreflect':
96 97 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
97 98 return
98 99
99 100 # Extra commands that we handle that aren't really wire protocol
100 101 # commands. Think extra hard before making this hackery available to
101 102 # extension.
102 103 extracommands = {'multirequest'}
103 104
104 105 if command not in COMMANDS and command not in extracommands:
105 106 res.status = b'404 Not Found'
106 107 res.headers[b'Content-Type'] = b'text/plain'
107 108 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
108 109 return
109 110
110 111 repo = rctx.repo
111 112 ui = repo.ui
112 113
113 114 proto = httpv2protocolhandler(req, ui)
114 115
115 116 if (not COMMANDS.commandavailable(command, proto)
116 117 and command not in extracommands):
117 118 res.status = b'404 Not Found'
118 119 res.headers[b'Content-Type'] = b'text/plain'
119 120 res.setbodybytes(_('invalid wire protocol command: %s') % command)
120 121 return
121 122
122 123 # TODO consider cases where proxies may add additional Accept headers.
123 124 if req.headers.get(b'Accept') != FRAMINGTYPE:
124 125 res.status = b'406 Not Acceptable'
125 126 res.headers[b'Content-Type'] = b'text/plain'
126 127 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
127 128 % FRAMINGTYPE)
128 129 return
129 130
130 131 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
131 132 res.status = b'415 Unsupported Media Type'
132 133 # TODO we should send a response with appropriate media type,
133 134 # since client does Accept it.
134 135 res.headers[b'Content-Type'] = b'text/plain'
135 136 res.setbodybytes(_('client MUST send Content-Type header with '
136 137 'value: %s\n') % FRAMINGTYPE)
137 138 return
138 139
139 140 _processhttpv2request(ui, repo, req, res, permission, command, proto)
140 141
141 142 def _processhttpv2reflectrequest(ui, repo, req, res):
142 143 """Reads unified frame protocol request and dumps out state to client.
143 144
144 145 This special endpoint can be used to help debug the wire protocol.
145 146
146 147 Instead of routing the request through the normal dispatch mechanism,
147 148 we instead read all frames, decode them, and feed them into our state
148 149 tracker. We then dump the log of all that activity back out to the
149 150 client.
150 151 """
151 import json
152
153 152 # Reflection APIs have a history of being abused, accidentally disclosing
154 153 # sensitive data, etc. So we have a config knob.
155 154 if not ui.configbool('experimental', 'web.api.debugreflect'):
156 155 res.status = b'404 Not Found'
157 156 res.headers[b'Content-Type'] = b'text/plain'
158 157 res.setbodybytes(_('debugreflect service not available'))
159 158 return
160 159
161 160 # We assume we have a unified framing protocol request body.
162 161
163 162 reactor = wireprotoframing.serverreactor(ui)
164 163 states = []
165 164
166 165 while True:
167 166 frame = wireprotoframing.readframe(req.bodyfh)
168 167
169 168 if not frame:
170 169 states.append(b'received: <no frame>')
171 170 break
172 171
173 172 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
174 173 frame.requestid,
175 174 frame.payload))
176 175
177 176 action, meta = reactor.onframerecv(frame)
178 states.append(json.dumps((action, meta), sort_keys=True,
179 separators=(', ', ': ')))
177 states.append(templatefilters.json((action, meta)))
180 178
181 179 action, meta = reactor.oninputeof()
182 180 meta['action'] = action
183 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
181 states.append(templatefilters.json(meta))
184 182
185 183 res.status = b'200 OK'
186 184 res.headers[b'Content-Type'] = b'text/plain'
187 185 res.setbodybytes(b'\n'.join(states))
188 186
189 187 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
190 188 """Post-validation handler for HTTPv2 requests.
191 189
192 190 Called when the HTTP request contains unified frame-based protocol
193 191 frames for evaluation.
194 192 """
195 193 # TODO Some HTTP clients are full duplex and can receive data before
196 194 # the entire request is transmitted. Figure out a way to indicate support
197 195 # for that so we can opt into full duplex mode.
198 196 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
199 197 seencommand = False
200 198
201 199 outstream = None
202 200
203 201 while True:
204 202 frame = wireprotoframing.readframe(req.bodyfh)
205 203 if not frame:
206 204 break
207 205
208 206 action, meta = reactor.onframerecv(frame)
209 207
210 208 if action == 'wantframe':
211 209 # Need more data before we can do anything.
212 210 continue
213 211 elif action == 'runcommand':
214 212 # Defer creating output stream because we need to wait for
215 213 # protocol settings frames so proper encoding can be applied.
216 214 if not outstream:
217 215 outstream = reactor.makeoutputstream()
218 216
219 217 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
220 218 reqcommand, reactor, outstream,
221 219 meta, issubsequent=seencommand)
222 220
223 221 if sentoutput:
224 222 return
225 223
226 224 seencommand = True
227 225
228 226 elif action == 'error':
229 227 # TODO define proper error mechanism.
230 228 res.status = b'200 OK'
231 229 res.headers[b'Content-Type'] = b'text/plain'
232 230 res.setbodybytes(meta['message'] + b'\n')
233 231 return
234 232 else:
235 233 raise error.ProgrammingError(
236 234 'unhandled action from frame processor: %s' % action)
237 235
238 236 action, meta = reactor.oninputeof()
239 237 if action == 'sendframes':
240 238 # We assume we haven't started sending the response yet. If we're
241 239 # wrong, the response type will raise an exception.
242 240 res.status = b'200 OK'
243 241 res.headers[b'Content-Type'] = FRAMINGTYPE
244 242 res.setbodygen(meta['framegen'])
245 243 elif action == 'noop':
246 244 pass
247 245 else:
248 246 raise error.ProgrammingError('unhandled action from frame processor: %s'
249 247 % action)
250 248
251 249 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
252 250 outstream, command, issubsequent):
253 251 """Dispatch a wire protocol command made from HTTPv2 requests.
254 252
255 253 The authenticated permission (``authedperm``) along with the original
256 254 command from the URL (``reqcommand``) are passed in.
257 255 """
258 256 # We already validated that the session has permissions to perform the
259 257 # actions in ``authedperm``. In the unified frame protocol, the canonical
260 258 # command to run is expressed in a frame. However, the URL also requested
261 259 # to run a specific command. We need to be careful that the command we
262 260 # run doesn't have permissions requirements greater than what was granted
263 261 # by ``authedperm``.
264 262 #
265 263 # Our rule for this is we only allow one command per HTTP request and
266 264 # that command must match the command in the URL. However, we make
267 265 # an exception for the ``multirequest`` URL. This URL is allowed to
268 266 # execute multiple commands. We double check permissions of each command
269 267 # as it is invoked to ensure there is no privilege escalation.
270 268 # TODO consider allowing multiple commands to regular command URLs
271 269 # iff each command is the same.
272 270
273 271 proto = httpv2protocolhandler(req, ui, args=command['args'])
274 272
275 273 if reqcommand == b'multirequest':
276 274 if not COMMANDS.commandavailable(command['command'], proto):
277 275 # TODO proper error mechanism
278 276 res.status = b'200 OK'
279 277 res.headers[b'Content-Type'] = b'text/plain'
280 278 res.setbodybytes(_('wire protocol command not available: %s') %
281 279 command['command'])
282 280 return True
283 281
284 282 # TODO don't use assert here, since it may be elided by -O.
285 283 assert authedperm in (b'ro', b'rw')
286 284 wirecommand = COMMANDS[command['command']]
287 285 assert wirecommand.permission in ('push', 'pull')
288 286
289 287 if authedperm == b'ro' and wirecommand.permission != 'pull':
290 288 # TODO proper error mechanism
291 289 res.status = b'403 Forbidden'
292 290 res.headers[b'Content-Type'] = b'text/plain'
293 291 res.setbodybytes(_('insufficient permissions to execute '
294 292 'command: %s') % command['command'])
295 293 return True
296 294
297 295 # TODO should we also call checkperm() here? Maybe not if we're going
298 296 # to overhaul that API. The granted scope from the URL check should
299 297 # be good enough.
300 298
301 299 else:
302 300 # Don't allow multiple commands outside of ``multirequest`` URL.
303 301 if issubsequent:
304 302 # TODO proper error mechanism
305 303 res.status = b'200 OK'
306 304 res.headers[b'Content-Type'] = b'text/plain'
307 305 res.setbodybytes(_('multiple commands cannot be issued to this '
308 306 'URL'))
309 307 return True
310 308
311 309 if reqcommand != command['command']:
312 310 # TODO define proper error mechanism
313 311 res.status = b'200 OK'
314 312 res.headers[b'Content-Type'] = b'text/plain'
315 313 res.setbodybytes(_('command in frame must match command in URL'))
316 314 return True
317 315
318 316 res.status = b'200 OK'
319 317 res.headers[b'Content-Type'] = FRAMINGTYPE
320 318
321 319 try:
322 320 objs = dispatch(repo, proto, command['command'], command['redirect'])
323 321
324 322 action, meta = reactor.oncommandresponsereadyobjects(
325 323 outstream, command['requestid'], objs)
326 324
327 325 except error.WireprotoCommandError as e:
328 326 action, meta = reactor.oncommanderror(
329 327 outstream, command['requestid'], e.message, e.messageargs)
330 328
331 329 except Exception as e:
332 330 action, meta = reactor.onservererror(
333 331 outstream, command['requestid'],
334 332 _('exception when invoking command: %s') %
335 333 stringutil.forcebytestr(e))
336 334
337 335 if action == 'sendframes':
338 336 res.setbodygen(meta['framegen'])
339 337 return True
340 338 elif action == 'noop':
341 339 return False
342 340 else:
343 341 raise error.ProgrammingError('unhandled event from reactor: %s' %
344 342 action)
345 343
346 344 def getdispatchrepo(repo, proto, command):
347 345 return repo.filtered('served')
348 346
349 347 def dispatch(repo, proto, command, redirect):
350 348 """Run a wire protocol command.
351 349
352 350 Returns an iterable of objects that will be sent to the client.
353 351 """
354 352 repo = getdispatchrepo(repo, proto, command)
355 353
356 354 entry = COMMANDS[command]
357 355 func = entry.func
358 356 spec = entry.args
359 357
360 358 args = proto.getargs(spec)
361 359
362 360 # There is some duplicate boilerplate code here for calling the command and
363 361 # emitting objects. It is either that or a lot of indented code that looks
364 362 # like a pyramid (since there are a lot of code paths that result in not
365 363 # using the cacher).
366 364 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
367 365
368 366 # Request is not cacheable. Don't bother instantiating a cacher.
369 367 if not entry.cachekeyfn:
370 368 for o in callcommand():
371 369 yield o
372 370 return
373 371
374 372 if redirect:
375 373 redirecttargets = redirect[b'targets']
376 374 redirecthashes = redirect[b'hashes']
377 375 else:
378 376 redirecttargets = []
379 377 redirecthashes = []
380 378
381 379 cacher = makeresponsecacher(repo, proto, command, args,
382 380 cborutil.streamencode,
383 381 redirecttargets=redirecttargets,
384 382 redirecthashes=redirecthashes)
385 383
386 384 # But we have no cacher. Do default handling.
387 385 if not cacher:
388 386 for o in callcommand():
389 387 yield o
390 388 return
391 389
392 390 with cacher:
393 391 cachekey = entry.cachekeyfn(repo, proto, cacher,
394 392 **pycompat.strkwargs(args))
395 393
396 394 # No cache key or the cacher doesn't like it. Do default handling.
397 395 if cachekey is None or not cacher.setcachekey(cachekey):
398 396 for o in callcommand():
399 397 yield o
400 398 return
401 399
402 400 # Serve it from the cache, if possible.
403 401 cached = cacher.lookup()
404 402
405 403 if cached:
406 404 for o in cached['objs']:
407 405 yield o
408 406 return
409 407
410 408 # Else call the command and feed its output into the cacher, allowing
411 409 # the cacher to buffer/mutate objects as it desires.
412 410 for o in callcommand():
413 411 for o in cacher.onobject(o):
414 412 yield o
415 413
416 414 for o in cacher.onfinished():
417 415 yield o
418 416
419 417 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
420 418 class httpv2protocolhandler(object):
421 419 def __init__(self, req, ui, args=None):
422 420 self._req = req
423 421 self._ui = ui
424 422 self._args = args
425 423
426 424 @property
427 425 def name(self):
428 426 return HTTP_WIREPROTO_V2
429 427
430 428 def getargs(self, args):
431 429 # First look for args that were passed but aren't registered on this
432 430 # command.
433 431 extra = set(self._args) - set(args)
434 432 if extra:
435 433 raise error.WireprotoCommandError(
436 434 'unsupported argument to command: %s' %
437 435 ', '.join(sorted(extra)))
438 436
439 437 # And look for required arguments that are missing.
440 438 missing = {a for a in args if args[a]['required']} - set(self._args)
441 439
442 440 if missing:
443 441 raise error.WireprotoCommandError(
444 442 'missing required arguments: %s' % ', '.join(sorted(missing)))
445 443
446 444 # Now derive the arguments to pass to the command, taking into
447 445 # account the arguments specified by the client.
448 446 data = {}
449 447 for k, meta in sorted(args.items()):
450 448 # This argument wasn't passed by the client.
451 449 if k not in self._args:
452 450 data[k] = meta['default']()
453 451 continue
454 452
455 453 v = self._args[k]
456 454
457 455 # Sets may be expressed as lists. Silently normalize.
458 456 if meta['type'] == 'set' and isinstance(v, list):
459 457 v = set(v)
460 458
461 459 # TODO consider more/stronger type validation.
462 460
463 461 data[k] = v
464 462
465 463 return data
466 464
467 465 def getprotocaps(self):
468 466 # Protocol capabilities are currently not implemented for HTTP V2.
469 467 return set()
470 468
471 469 def getpayload(self):
472 470 raise NotImplementedError
473 471
474 472 @contextlib.contextmanager
475 473 def mayberedirectstdio(self):
476 474 raise NotImplementedError
477 475
478 476 def client(self):
479 477 raise NotImplementedError
480 478
481 479 def addcapabilities(self, repo, caps):
482 480 return caps
483 481
484 482 def checkperm(self, perm):
485 483 raise NotImplementedError
486 484
487 485 def httpv2apidescriptor(req, repo):
488 486 proto = httpv2protocolhandler(req, repo.ui)
489 487
490 488 return _capabilitiesv2(repo, proto)
491 489
492 490 def _capabilitiesv2(repo, proto):
493 491 """Obtain the set of capabilities for version 2 transports.
494 492
495 493 These capabilities are distinct from the capabilities for version 1
496 494 transports.
497 495 """
498 496 caps = {
499 497 'commands': {},
500 498 'framingmediatypes': [FRAMINGTYPE],
501 499 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
502 500 }
503 501
504 502 for command, entry in COMMANDS.items():
505 503 args = {}
506 504
507 505 for arg, meta in entry.args.items():
508 506 args[arg] = {
509 507 # TODO should this be a normalized type using CBOR's
510 508 # terminology?
511 509 b'type': meta['type'],
512 510 b'required': meta['required'],
513 511 }
514 512
515 513 if not meta['required']:
516 514 args[arg][b'default'] = meta['default']()
517 515
518 516 if meta['validvalues']:
519 517 args[arg][b'validvalues'] = meta['validvalues']
520 518
521 519 # TODO this type of check should be defined in a per-command callback.
522 520 if (command == b'rawstorefiledata'
523 521 and not streamclone.allowservergeneration(repo)):
524 522 continue
525 523
526 524 caps['commands'][command] = {
527 525 'args': args,
528 526 'permissions': [entry.permission],
529 527 }
530 528
531 529 if entry.extracapabilitiesfn:
532 530 extracaps = entry.extracapabilitiesfn(repo, proto)
533 531 caps['commands'][command].update(extracaps)
534 532
535 533 caps['rawrepoformats'] = sorted(repo.requirements &
536 534 repo.supportedformats)
537 535
538 536 targets = getadvertisedredirecttargets(repo, proto)
539 537 if targets:
540 538 caps[b'redirect'] = {
541 539 b'targets': [],
542 540 b'hashes': [b'sha256', b'sha1'],
543 541 }
544 542
545 543 for target in targets:
546 544 entry = {
547 545 b'name': target['name'],
548 546 b'protocol': target['protocol'],
549 547 b'uris': target['uris'],
550 548 }
551 549
552 550 for key in ('snirequired', 'tlsversions'):
553 551 if key in target:
554 552 entry[key] = target[key]
555 553
556 554 caps[b'redirect'][b'targets'].append(entry)
557 555
558 556 return proto.addcapabilities(repo, caps)
559 557
560 558 def getadvertisedredirecttargets(repo, proto):
561 559 """Obtain a list of content redirect targets.
562 560
563 561 Returns a list containing potential redirect targets that will be
564 562 advertised in capabilities data. Each dict MUST have the following
565 563 keys:
566 564
567 565 name
568 566 The name of this redirect target. This is the identifier clients use
569 567 to refer to a target. It is transferred as part of every command
570 568 request.
571 569
572 570 protocol
573 571 Network protocol used by this target. Typically this is the string
574 572 in front of the ``://`` in a URL. e.g. ``https``.
575 573
576 574 uris
577 575 List of representative URIs for this target. Clients can use the
578 576 URIs to test parsing for compatibility or for ordering preference
579 577 for which target to use.
580 578
581 579 The following optional keys are recognized:
582 580
583 581 snirequired
584 582 Bool indicating if Server Name Indication (SNI) is required to
585 583 connect to this target.
586 584
587 585 tlsversions
588 586 List of bytes indicating which TLS versions are supported by this
589 587 target.
590 588
591 589 By default, clients reflect the target order advertised by servers
592 590 and servers will use the first client-advertised target when picking
593 591 a redirect target. So targets should be advertised in the order the
594 592 server prefers they be used.
595 593 """
596 594 return []
597 595
598 596 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None,
599 597 extracapabilitiesfn=None):
600 598 """Decorator to declare a wire protocol command.
601 599
602 600 ``name`` is the name of the wire protocol command being provided.
603 601
604 602 ``args`` is a dict defining arguments accepted by the command. Keys are
605 603 the argument name. Values are dicts with the following keys:
606 604
607 605 ``type``
608 606 The argument data type. Must be one of the following string
609 607 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
610 608 or ``bool``.
611 609
612 610 ``default``
613 611 A callable returning the default value for this argument. If not
614 612 specified, ``None`` will be the default value.
615 613
616 614 ``example``
617 615 An example value for this argument.
618 616
619 617 ``validvalues``
620 618 Set of recognized values for this argument.
621 619
622 620 ``permission`` defines the permission type needed to run this command.
623 621 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
624 622 respectively. Default is to assume command requires ``push`` permissions
625 623 because otherwise commands not declaring their permissions could modify
626 624 a repository that is supposed to be read-only.
627 625
628 626 ``cachekeyfn`` defines an optional callable that can derive the
629 627 cache key for this request.
630 628
631 629 ``extracapabilitiesfn`` defines an optional callable that defines extra
632 630 command capabilities/parameters that are advertised next to the command
633 631 in the capabilities data structure describing the server. The callable
634 632 receives as arguments the repository and protocol objects. It returns
635 633 a dict of extra fields to add to the command descriptor.
636 634
637 635 Wire protocol commands are generators of objects to be serialized and
638 636 sent to the client.
639 637
640 638 If a command raises an uncaught exception, this will be translated into
641 639 a command error.
642 640
643 641 All commands can opt in to being cacheable by defining a function
644 642 (``cachekeyfn``) that is called to derive a cache key. This function
645 643 receives the same arguments as the command itself plus a ``cacher``
646 644 argument containing the active cacher for the request and returns a bytes
647 645 containing the key in a cache the response to this command may be cached
648 646 under.
649 647 """
650 648 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
651 649 if v['version'] == 2}
652 650
653 651 if permission not in ('push', 'pull'):
654 652 raise error.ProgrammingError('invalid wire protocol permission; '
655 653 'got %s; expected "push" or "pull"' %
656 654 permission)
657 655
658 656 if args is None:
659 657 args = {}
660 658
661 659 if not isinstance(args, dict):
662 660 raise error.ProgrammingError('arguments for version 2 commands '
663 661 'must be declared as dicts')
664 662
665 663 for arg, meta in args.items():
666 664 if arg == '*':
667 665 raise error.ProgrammingError('* argument name not allowed on '
668 666 'version 2 commands')
669 667
670 668 if not isinstance(meta, dict):
671 669 raise error.ProgrammingError('arguments for version 2 commands '
672 670 'must declare metadata as a dict')
673 671
674 672 if 'type' not in meta:
675 673 raise error.ProgrammingError('%s argument for command %s does not '
676 674 'declare type field' % (arg, name))
677 675
678 676 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
679 677 raise error.ProgrammingError('%s argument for command %s has '
680 678 'illegal type: %s' % (arg, name,
681 679 meta['type']))
682 680
683 681 if 'example' not in meta:
684 682 raise error.ProgrammingError('%s argument for command %s does not '
685 683 'declare example field' % (arg, name))
686 684
687 685 meta['required'] = 'default' not in meta
688 686
689 687 meta.setdefault('default', lambda: None)
690 688 meta.setdefault('validvalues', None)
691 689
692 690 def register(func):
693 691 if name in COMMANDS:
694 692 raise error.ProgrammingError('%s command already registered '
695 693 'for version 2' % name)
696 694
697 695 COMMANDS[name] = wireprototypes.commandentry(
698 696 func, args=args, transports=transports, permission=permission,
699 697 cachekeyfn=cachekeyfn, extracapabilitiesfn=extracapabilitiesfn)
700 698
701 699 return func
702 700
703 701 return register
704 702
705 703 def makecommandcachekeyfn(command, localversion=None, allargs=False):
706 704 """Construct a cache key derivation function with common features.
707 705
708 706 By default, the cache key is a hash of:
709 707
710 708 * The command name.
711 709 * A global cache version number.
712 710 * A local cache version number (passed via ``localversion``).
713 711 * All the arguments passed to the command.
714 712 * The media type used.
715 713 * Wire protocol version string.
716 714 * The repository path.
717 715 """
718 716 if not allargs:
719 717 raise error.ProgrammingError('only allargs=True is currently supported')
720 718
721 719 if localversion is None:
722 720 raise error.ProgrammingError('must set localversion argument value')
723 721
724 722 def cachekeyfn(repo, proto, cacher, **args):
725 723 spec = COMMANDS[command]
726 724
727 725 # Commands that mutate the repo can not be cached.
728 726 if spec.permission == 'push':
729 727 return None
730 728
731 729 # TODO config option to disable caching.
732 730
733 731 # Our key derivation strategy is to construct a data structure
734 732 # holding everything that could influence cacheability and to hash
735 733 # the CBOR representation of that. Using CBOR seems like it might
736 734 # be overkill. However, simpler hashing mechanisms are prone to
737 735 # duplicate input issues. e.g. if you just concatenate two values,
738 736 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
739 737 # "padding" between values and prevents these problems.
740 738
741 739 # Seed the hash with various data.
742 740 state = {
743 741 # To invalidate all cache keys.
744 742 b'globalversion': GLOBAL_CACHE_VERSION,
745 743 # More granular cache key invalidation.
746 744 b'localversion': localversion,
747 745 # Cache keys are segmented by command.
748 746 b'command': command,
749 747 # Throw in the media type and API version strings so changes
750 748 # to exchange semantics invalid cache.
751 749 b'mediatype': FRAMINGTYPE,
752 750 b'version': HTTP_WIREPROTO_V2,
753 751 # So same requests for different repos don't share cache keys.
754 752 b'repo': repo.root,
755 753 }
756 754
757 755 # The arguments passed to us will have already been normalized.
758 756 # Default values will be set, etc. This is important because it
759 757 # means that it doesn't matter if clients send an explicit argument
760 758 # or rely on the default value: it will all normalize to the same
761 759 # set of arguments on the server and therefore the same cache key.
762 760 #
763 761 # Arguments by their very nature must support being encoded to CBOR.
764 762 # And the CBOR encoder is deterministic. So we hash the arguments
765 763 # by feeding the CBOR of their representation into the hasher.
766 764 if allargs:
767 765 state[b'args'] = pycompat.byteskwargs(args)
768 766
769 767 cacher.adjustcachekeystate(state)
770 768
771 769 hasher = hashlib.sha1()
772 770 for chunk in cborutil.streamencode(state):
773 771 hasher.update(chunk)
774 772
775 773 return pycompat.sysbytes(hasher.hexdigest())
776 774
777 775 return cachekeyfn
778 776
779 777 def makeresponsecacher(repo, proto, command, args, objencoderfn,
780 778 redirecttargets, redirecthashes):
781 779 """Construct a cacher for a cacheable command.
782 780
783 781 Returns an ``iwireprotocolcommandcacher`` instance.
784 782
785 783 Extensions can monkeypatch this function to provide custom caching
786 784 backends.
787 785 """
788 786 return None
789 787
790 788 def resolvenodes(repo, revisions):
791 789 """Resolve nodes from a revisions specifier data structure."""
792 790 cl = repo.changelog
793 791 clhasnode = cl.hasnode
794 792
795 793 seen = set()
796 794 nodes = []
797 795
798 796 if not isinstance(revisions, list):
799 797 raise error.WireprotoCommandError('revisions must be defined as an '
800 798 'array')
801 799
802 800 for spec in revisions:
803 801 if b'type' not in spec:
804 802 raise error.WireprotoCommandError(
805 803 'type key not present in revision specifier')
806 804
807 805 typ = spec[b'type']
808 806
809 807 if typ == b'changesetexplicit':
810 808 if b'nodes' not in spec:
811 809 raise error.WireprotoCommandError(
812 810 'nodes key not present in changesetexplicit revision '
813 811 'specifier')
814 812
815 813 for node in spec[b'nodes']:
816 814 if node not in seen:
817 815 nodes.append(node)
818 816 seen.add(node)
819 817
820 818 elif typ == b'changesetexplicitdepth':
821 819 for key in (b'nodes', b'depth'):
822 820 if key not in spec:
823 821 raise error.WireprotoCommandError(
824 822 '%s key not present in changesetexplicitdepth revision '
825 823 'specifier', (key,))
826 824
827 825 for rev in repo.revs(b'ancestors(%ln, %s)', spec[b'nodes'],
828 826 spec[b'depth'] - 1):
829 827 node = cl.node(rev)
830 828
831 829 if node not in seen:
832 830 nodes.append(node)
833 831 seen.add(node)
834 832
835 833 elif typ == b'changesetdagrange':
836 834 for key in (b'roots', b'heads'):
837 835 if key not in spec:
838 836 raise error.WireprotoCommandError(
839 837 '%s key not present in changesetdagrange revision '
840 838 'specifier', (key,))
841 839
842 840 if not spec[b'heads']:
843 841 raise error.WireprotoCommandError(
844 842 'heads key in changesetdagrange cannot be empty')
845 843
846 844 if spec[b'roots']:
847 845 common = [n for n in spec[b'roots'] if clhasnode(n)]
848 846 else:
849 847 common = [nullid]
850 848
851 849 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
852 850 if n not in seen:
853 851 nodes.append(n)
854 852 seen.add(n)
855 853
856 854 else:
857 855 raise error.WireprotoCommandError(
858 856 'unknown revision specifier type: %s', (typ,))
859 857
860 858 return nodes
861 859
862 860 @wireprotocommand('branchmap', permission='pull')
863 861 def branchmapv2(repo, proto):
864 862 yield {encoding.fromlocal(k): v
865 863 for k, v in repo.branchmap().iteritems()}
866 864
867 865 @wireprotocommand('capabilities', permission='pull')
868 866 def capabilitiesv2(repo, proto):
869 867 yield _capabilitiesv2(repo, proto)
870 868
871 869 @wireprotocommand(
872 870 'changesetdata',
873 871 args={
874 872 'revisions': {
875 873 'type': 'list',
876 874 'example': [{
877 875 b'type': b'changesetexplicit',
878 876 b'nodes': [b'abcdef...'],
879 877 }],
880 878 },
881 879 'fields': {
882 880 'type': 'set',
883 881 'default': set,
884 882 'example': {b'parents', b'revision'},
885 883 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
886 884 },
887 885 },
888 886 permission='pull')
889 887 def changesetdata(repo, proto, revisions, fields):
890 888 # TODO look for unknown fields and abort when they can't be serviced.
891 889 # This could probably be validated by dispatcher using validvalues.
892 890
893 891 cl = repo.changelog
894 892 outgoing = resolvenodes(repo, revisions)
895 893 publishing = repo.publishing()
896 894
897 895 if outgoing:
898 896 repo.hook('preoutgoing', throw=True, source='serve')
899 897
900 898 yield {
901 899 b'totalitems': len(outgoing),
902 900 }
903 901
904 902 # The phases of nodes already transferred to the client may have changed
905 903 # since the client last requested data. We send phase-only records
906 904 # for these revisions, if requested.
907 905 # TODO actually do this. We'll probably want to emit phase heads
908 906 # in the ancestry set of the outgoing revisions. This will ensure
909 907 # that phase updates within that set are seen.
910 908 if b'phase' in fields:
911 909 pass
912 910
913 911 nodebookmarks = {}
914 912 for mark, node in repo._bookmarks.items():
915 913 nodebookmarks.setdefault(node, set()).add(mark)
916 914
917 915 # It is already topologically sorted by revision number.
918 916 for node in outgoing:
919 917 d = {
920 918 b'node': node,
921 919 }
922 920
923 921 if b'parents' in fields:
924 922 d[b'parents'] = cl.parents(node)
925 923
926 924 if b'phase' in fields:
927 925 if publishing:
928 926 d[b'phase'] = b'public'
929 927 else:
930 928 ctx = repo[node]
931 929 d[b'phase'] = ctx.phasestr()
932 930
933 931 if b'bookmarks' in fields and node in nodebookmarks:
934 932 d[b'bookmarks'] = sorted(nodebookmarks[node])
935 933 del nodebookmarks[node]
936 934
937 935 followingmeta = []
938 936 followingdata = []
939 937
940 938 if b'revision' in fields:
941 939 revisiondata = cl.revision(node, raw=True)
942 940 followingmeta.append((b'revision', len(revisiondata)))
943 941 followingdata.append(revisiondata)
944 942
945 943 # TODO make it possible for extensions to wrap a function or register
946 944 # a handler to service custom fields.
947 945
948 946 if followingmeta:
949 947 d[b'fieldsfollowing'] = followingmeta
950 948
951 949 yield d
952 950
953 951 for extra in followingdata:
954 952 yield extra
955 953
956 954 # If requested, send bookmarks from nodes that didn't have revision
957 955 # data sent so receiver is aware of any bookmark updates.
958 956 if b'bookmarks' in fields:
959 957 for node, marks in sorted(nodebookmarks.iteritems()):
960 958 yield {
961 959 b'node': node,
962 960 b'bookmarks': sorted(marks),
963 961 }
964 962
965 963 class FileAccessError(Exception):
966 964 """Represents an error accessing a specific file."""
967 965
968 966 def __init__(self, path, msg, args):
969 967 self.path = path
970 968 self.msg = msg
971 969 self.args = args
972 970
973 971 def getfilestore(repo, proto, path):
974 972 """Obtain a file storage object for use with wire protocol.
975 973
976 974 Exists as a standalone function so extensions can monkeypatch to add
977 975 access control.
978 976 """
979 977 # This seems to work even if the file doesn't exist. So catch
980 978 # "empty" files and return an error.
981 979 fl = repo.file(path)
982 980
983 981 if not len(fl):
984 982 raise FileAccessError(path, 'unknown file: %s', (path,))
985 983
986 984 return fl
987 985
988 986 def emitfilerevisions(repo, path, revisions, linknodes, fields):
989 987 for revision in revisions:
990 988 d = {
991 989 b'node': revision.node,
992 990 }
993 991
994 992 if b'parents' in fields:
995 993 d[b'parents'] = [revision.p1node, revision.p2node]
996 994
997 995 if b'linknode' in fields:
998 996 d[b'linknode'] = linknodes[revision.node]
999 997
1000 998 followingmeta = []
1001 999 followingdata = []
1002 1000
1003 1001 if b'revision' in fields:
1004 1002 if revision.revision is not None:
1005 1003 followingmeta.append((b'revision', len(revision.revision)))
1006 1004 followingdata.append(revision.revision)
1007 1005 else:
1008 1006 d[b'deltabasenode'] = revision.basenode
1009 1007 followingmeta.append((b'delta', len(revision.delta)))
1010 1008 followingdata.append(revision.delta)
1011 1009
1012 1010 if followingmeta:
1013 1011 d[b'fieldsfollowing'] = followingmeta
1014 1012
1015 1013 yield d
1016 1014
1017 1015 for extra in followingdata:
1018 1016 yield extra
1019 1017
1020 1018 def makefilematcher(repo, pathfilter):
1021 1019 """Construct a matcher from a path filter dict."""
1022 1020
1023 1021 # Validate values.
1024 1022 if pathfilter:
1025 1023 for key in (b'include', b'exclude'):
1026 1024 for pattern in pathfilter.get(key, []):
1027 1025 if not pattern.startswith((b'path:', b'rootfilesin:')):
1028 1026 raise error.WireprotoCommandError(
1029 1027 '%s pattern must begin with `path:` or `rootfilesin:`; '
1030 1028 'got %s', (key, pattern))
1031 1029
1032 1030 if pathfilter:
1033 1031 matcher = matchmod.match(repo.root, b'',
1034 1032 include=pathfilter.get(b'include', []),
1035 1033 exclude=pathfilter.get(b'exclude', []))
1036 1034 else:
1037 1035 matcher = matchmod.match(repo.root, b'')
1038 1036
1039 1037 # Requested patterns could include files not in the local store. So
1040 1038 # filter those out.
1041 1039 return repo.narrowmatch(matcher)
1042 1040
1043 1041 @wireprotocommand(
1044 1042 'filedata',
1045 1043 args={
1046 1044 'haveparents': {
1047 1045 'type': 'bool',
1048 1046 'default': lambda: False,
1049 1047 'example': True,
1050 1048 },
1051 1049 'nodes': {
1052 1050 'type': 'list',
1053 1051 'example': [b'0123456...'],
1054 1052 },
1055 1053 'fields': {
1056 1054 'type': 'set',
1057 1055 'default': set,
1058 1056 'example': {b'parents', b'revision'},
1059 1057 'validvalues': {b'parents', b'revision', b'linknode'},
1060 1058 },
1061 1059 'path': {
1062 1060 'type': 'bytes',
1063 1061 'example': b'foo.txt',
1064 1062 }
1065 1063 },
1066 1064 permission='pull',
1067 1065 # TODO censoring a file revision won't invalidate the cache.
1068 1066 # Figure out a way to take censoring into account when deriving
1069 1067 # the cache key.
1070 1068 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
1071 1069 def filedata(repo, proto, haveparents, nodes, fields, path):
1072 1070 # TODO this API allows access to file revisions that are attached to
1073 1071 # secret changesets. filesdata does not have this problem. Maybe this
1074 1072 # API should be deleted?
1075 1073
1076 1074 try:
1077 1075 # Extensions may wish to access the protocol handler.
1078 1076 store = getfilestore(repo, proto, path)
1079 1077 except FileAccessError as e:
1080 1078 raise error.WireprotoCommandError(e.msg, e.args)
1081 1079
1082 1080 clnode = repo.changelog.node
1083 1081 linknodes = {}
1084 1082
1085 1083 # Validate requested nodes.
1086 1084 for node in nodes:
1087 1085 try:
1088 1086 store.rev(node)
1089 1087 except error.LookupError:
1090 1088 raise error.WireprotoCommandError('unknown file node: %s',
1091 1089 (hex(node),))
1092 1090
1093 1091 # TODO by creating the filectx against a specific file revision
1094 1092 # instead of changeset, linkrev() is always used. This is wrong for
1095 1093 # cases where linkrev() may refer to a hidden changeset. But since this
1096 1094 # API doesn't know anything about changesets, we're not sure how to
1097 1095 # disambiguate the linknode. Perhaps we should delete this API?
1098 1096 fctx = repo.filectx(path, fileid=node)
1099 1097 linknodes[node] = clnode(fctx.introrev())
1100 1098
1101 1099 revisions = store.emitrevisions(nodes,
1102 1100 revisiondata=b'revision' in fields,
1103 1101 assumehaveparentrevisions=haveparents)
1104 1102
1105 1103 yield {
1106 1104 b'totalitems': len(nodes),
1107 1105 }
1108 1106
1109 1107 for o in emitfilerevisions(repo, path, revisions, linknodes, fields):
1110 1108 yield o
1111 1109
1112 1110 def filesdatacapabilities(repo, proto):
1113 1111 batchsize = repo.ui.configint(
1114 1112 b'experimental', b'server.filesdata.recommended-batch-size')
1115 1113 return {
1116 1114 b'recommendedbatchsize': batchsize,
1117 1115 }
1118 1116
1119 1117 @wireprotocommand(
1120 1118 'filesdata',
1121 1119 args={
1122 1120 'haveparents': {
1123 1121 'type': 'bool',
1124 1122 'default': lambda: False,
1125 1123 'example': True,
1126 1124 },
1127 1125 'fields': {
1128 1126 'type': 'set',
1129 1127 'default': set,
1130 1128 'example': {b'parents', b'revision'},
1131 1129 'validvalues': {b'firstchangeset', b'linknode', b'parents',
1132 1130 b'revision'},
1133 1131 },
1134 1132 'pathfilter': {
1135 1133 'type': 'dict',
1136 1134 'default': lambda: None,
1137 1135 'example': {b'include': [b'path:tests']},
1138 1136 },
1139 1137 'revisions': {
1140 1138 'type': 'list',
1141 1139 'example': [{
1142 1140 b'type': b'changesetexplicit',
1143 1141 b'nodes': [b'abcdef...'],
1144 1142 }],
1145 1143 },
1146 1144 },
1147 1145 permission='pull',
1148 1146 # TODO censoring a file revision won't invalidate the cache.
1149 1147 # Figure out a way to take censoring into account when deriving
1150 1148 # the cache key.
1151 1149 cachekeyfn=makecommandcachekeyfn('filesdata', 1, allargs=True),
1152 1150 extracapabilitiesfn=filesdatacapabilities)
1153 1151 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1154 1152 # TODO This should operate on a repo that exposes obsolete changesets. There
1155 1153 # is a race between a client making a push that obsoletes a changeset and
1156 1154 # another client fetching files data for that changeset. If a client has a
1157 1155 # changeset, it should probably be allowed to access files data for that
1158 1156 # changeset.
1159 1157
1160 1158 outgoing = resolvenodes(repo, revisions)
1161 1159 filematcher = makefilematcher(repo, pathfilter)
1162 1160
1163 1161 # path -> {fnode: linknode}
1164 1162 fnodes = collections.defaultdict(dict)
1165 1163
1166 1164 # We collect the set of relevant file revisions by iterating the changeset
1167 1165 # revisions and either walking the set of files recorded in the changeset
1168 1166 # or by walking the manifest at that revision. There is probably room for a
1169 1167 # storage-level API to request this data, as it can be expensive to compute
1170 1168 # and would benefit from caching or alternate storage from what revlogs
1171 1169 # provide.
1172 1170 for node in outgoing:
1173 1171 ctx = repo[node]
1174 1172 mctx = ctx.manifestctx()
1175 1173 md = mctx.read()
1176 1174
1177 1175 if haveparents:
1178 1176 checkpaths = ctx.files()
1179 1177 else:
1180 1178 checkpaths = md.keys()
1181 1179
1182 1180 for path in checkpaths:
1183 1181 fnode = md[path]
1184 1182
1185 1183 if path in fnodes and fnode in fnodes[path]:
1186 1184 continue
1187 1185
1188 1186 if not filematcher(path):
1189 1187 continue
1190 1188
1191 1189 fnodes[path].setdefault(fnode, node)
1192 1190
1193 1191 yield {
1194 1192 b'totalpaths': len(fnodes),
1195 1193 b'totalitems': sum(len(v) for v in fnodes.values())
1196 1194 }
1197 1195
1198 1196 for path, filenodes in sorted(fnodes.items()):
1199 1197 try:
1200 1198 store = getfilestore(repo, proto, path)
1201 1199 except FileAccessError as e:
1202 1200 raise error.WireprotoCommandError(e.msg, e.args)
1203 1201
1204 1202 yield {
1205 1203 b'path': path,
1206 1204 b'totalitems': len(filenodes),
1207 1205 }
1208 1206
1209 1207 revisions = store.emitrevisions(filenodes.keys(),
1210 1208 revisiondata=b'revision' in fields,
1211 1209 assumehaveparentrevisions=haveparents)
1212 1210
1213 1211 for o in emitfilerevisions(repo, path, revisions, filenodes, fields):
1214 1212 yield o
1215 1213
1216 1214 @wireprotocommand(
1217 1215 'heads',
1218 1216 args={
1219 1217 'publiconly': {
1220 1218 'type': 'bool',
1221 1219 'default': lambda: False,
1222 1220 'example': False,
1223 1221 },
1224 1222 },
1225 1223 permission='pull')
1226 1224 def headsv2(repo, proto, publiconly):
1227 1225 if publiconly:
1228 1226 repo = repo.filtered('immutable')
1229 1227
1230 1228 yield repo.heads()
1231 1229
1232 1230 @wireprotocommand(
1233 1231 'known',
1234 1232 args={
1235 1233 'nodes': {
1236 1234 'type': 'list',
1237 1235 'default': list,
1238 1236 'example': [b'deadbeef'],
1239 1237 },
1240 1238 },
1241 1239 permission='pull')
1242 1240 def knownv2(repo, proto, nodes):
1243 1241 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1244 1242 yield result
1245 1243
1246 1244 @wireprotocommand(
1247 1245 'listkeys',
1248 1246 args={
1249 1247 'namespace': {
1250 1248 'type': 'bytes',
1251 1249 'example': b'ns',
1252 1250 },
1253 1251 },
1254 1252 permission='pull')
1255 1253 def listkeysv2(repo, proto, namespace):
1256 1254 keys = repo.listkeys(encoding.tolocal(namespace))
1257 1255 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1258 1256 for k, v in keys.iteritems()}
1259 1257
1260 1258 yield keys
1261 1259
1262 1260 @wireprotocommand(
1263 1261 'lookup',
1264 1262 args={
1265 1263 'key': {
1266 1264 'type': 'bytes',
1267 1265 'example': b'foo',
1268 1266 },
1269 1267 },
1270 1268 permission='pull')
1271 1269 def lookupv2(repo, proto, key):
1272 1270 key = encoding.tolocal(key)
1273 1271
1274 1272 # TODO handle exception.
1275 1273 node = repo.lookup(key)
1276 1274
1277 1275 yield node
1278 1276
1279 1277 def manifestdatacapabilities(repo, proto):
1280 1278 batchsize = repo.ui.configint(
1281 1279 b'experimental', b'server.manifestdata.recommended-batch-size')
1282 1280
1283 1281 return {
1284 1282 b'recommendedbatchsize': batchsize,
1285 1283 }
1286 1284
1287 1285 @wireprotocommand(
1288 1286 'manifestdata',
1289 1287 args={
1290 1288 'nodes': {
1291 1289 'type': 'list',
1292 1290 'example': [b'0123456...'],
1293 1291 },
1294 1292 'haveparents': {
1295 1293 'type': 'bool',
1296 1294 'default': lambda: False,
1297 1295 'example': True,
1298 1296 },
1299 1297 'fields': {
1300 1298 'type': 'set',
1301 1299 'default': set,
1302 1300 'example': {b'parents', b'revision'},
1303 1301 'validvalues': {b'parents', b'revision'},
1304 1302 },
1305 1303 'tree': {
1306 1304 'type': 'bytes',
1307 1305 'example': b'',
1308 1306 },
1309 1307 },
1310 1308 permission='pull',
1311 1309 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True),
1312 1310 extracapabilitiesfn=manifestdatacapabilities)
1313 1311 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1314 1312 store = repo.manifestlog.getstorage(tree)
1315 1313
1316 1314 # Validate the node is known and abort on unknown revisions.
1317 1315 for node in nodes:
1318 1316 try:
1319 1317 store.rev(node)
1320 1318 except error.LookupError:
1321 1319 raise error.WireprotoCommandError(
1322 1320 'unknown node: %s', (node,))
1323 1321
1324 1322 revisions = store.emitrevisions(nodes,
1325 1323 revisiondata=b'revision' in fields,
1326 1324 assumehaveparentrevisions=haveparents)
1327 1325
1328 1326 yield {
1329 1327 b'totalitems': len(nodes),
1330 1328 }
1331 1329
1332 1330 for revision in revisions:
1333 1331 d = {
1334 1332 b'node': revision.node,
1335 1333 }
1336 1334
1337 1335 if b'parents' in fields:
1338 1336 d[b'parents'] = [revision.p1node, revision.p2node]
1339 1337
1340 1338 followingmeta = []
1341 1339 followingdata = []
1342 1340
1343 1341 if b'revision' in fields:
1344 1342 if revision.revision is not None:
1345 1343 followingmeta.append((b'revision', len(revision.revision)))
1346 1344 followingdata.append(revision.revision)
1347 1345 else:
1348 1346 d[b'deltabasenode'] = revision.basenode
1349 1347 followingmeta.append((b'delta', len(revision.delta)))
1350 1348 followingdata.append(revision.delta)
1351 1349
1352 1350 if followingmeta:
1353 1351 d[b'fieldsfollowing'] = followingmeta
1354 1352
1355 1353 yield d
1356 1354
1357 1355 for extra in followingdata:
1358 1356 yield extra
1359 1357
1360 1358 @wireprotocommand(
1361 1359 'pushkey',
1362 1360 args={
1363 1361 'namespace': {
1364 1362 'type': 'bytes',
1365 1363 'example': b'ns',
1366 1364 },
1367 1365 'key': {
1368 1366 'type': 'bytes',
1369 1367 'example': b'key',
1370 1368 },
1371 1369 'old': {
1372 1370 'type': 'bytes',
1373 1371 'example': b'old',
1374 1372 },
1375 1373 'new': {
1376 1374 'type': 'bytes',
1377 1375 'example': 'new',
1378 1376 },
1379 1377 },
1380 1378 permission='push')
1381 1379 def pushkeyv2(repo, proto, namespace, key, old, new):
1382 1380 # TODO handle ui output redirection
1383 1381 yield repo.pushkey(encoding.tolocal(namespace),
1384 1382 encoding.tolocal(key),
1385 1383 encoding.tolocal(old),
1386 1384 encoding.tolocal(new))
1387 1385
1388 1386
1389 1387 @wireprotocommand(
1390 1388 'rawstorefiledata',
1391 1389 args={
1392 1390 'files': {
1393 1391 'type': 'list',
1394 1392 'example': [b'changelog', b'manifestlog'],
1395 1393 },
1396 1394 'pathfilter': {
1397 1395 'type': 'list',
1398 1396 'default': lambda: None,
1399 1397 'example': {b'include': [b'path:tests']},
1400 1398 },
1401 1399 },
1402 1400 permission='pull')
1403 1401 def rawstorefiledata(repo, proto, files, pathfilter):
1404 1402 if not streamclone.allowservergeneration(repo):
1405 1403 raise error.WireprotoCommandError(b'stream clone is disabled')
1406 1404
1407 1405 # TODO support dynamically advertising what store files "sets" are
1408 1406 # available. For now, we support changelog, manifestlog, and files.
1409 1407 files = set(files)
1410 1408 allowedfiles = {b'changelog', b'manifestlog'}
1411 1409
1412 1410 unsupported = files - allowedfiles
1413 1411 if unsupported:
1414 1412 raise error.WireprotoCommandError(b'unknown file type: %s',
1415 1413 (b', '.join(sorted(unsupported)),))
1416 1414
1417 1415 with repo.lock():
1418 1416 topfiles = list(repo.store.topfiles())
1419 1417
1420 1418 sendfiles = []
1421 1419 totalsize = 0
1422 1420
1423 1421 # TODO this is a bunch of storage layer interface abstractions because
1424 1422 # it assumes revlogs.
1425 1423 for name, encodedname, size in topfiles:
1426 1424 if b'changelog' in files and name.startswith(b'00changelog'):
1427 1425 pass
1428 1426 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1429 1427 pass
1430 1428 else:
1431 1429 continue
1432 1430
1433 1431 sendfiles.append((b'store', name, size))
1434 1432 totalsize += size
1435 1433
1436 1434 yield {
1437 1435 b'filecount': len(sendfiles),
1438 1436 b'totalsize': totalsize,
1439 1437 }
1440 1438
1441 1439 for location, name, size in sendfiles:
1442 1440 yield {
1443 1441 b'location': location,
1444 1442 b'path': name,
1445 1443 b'size': size,
1446 1444 }
1447 1445
1448 1446 # We have to use a closure for this to ensure the context manager is
1449 1447 # closed only after sending the final chunk.
1450 1448 def getfiledata():
1451 1449 with repo.svfs(name, 'rb', auditpath=False) as fh:
1452 1450 for chunk in util.filechunkiter(fh, limit=size):
1453 1451 yield chunk
1454 1452
1455 1453 yield wireprototypes.indefinitebytestringresponse(
1456 1454 getfiledata())
General Comments 0
You need to be logged in to leave comments. Login now