##// END OF EJS Templates
wireprotov2server: port to emitrevisions()...
Gregory Szorc -
r39900:7b752bf0 default
parent child Browse files
Show More
@@ -1,1109 +1,984 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10
11 11 from .i18n import _
12 12 from .node import (
13 13 hex,
14 14 nullid,
15 nullrev,
16 15 )
17 16 from . import (
18 changegroup,
19 dagop,
20 17 discovery,
21 18 encoding,
22 19 error,
23 20 narrowspec,
24 21 pycompat,
25 22 streamclone,
26 23 util,
27 24 wireprotoframing,
28 25 wireprototypes,
29 26 )
30 27 from .utils import (
31 28 interfaceutil,
32 29 stringutil,
33 30 )
34 31
35 32 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
36 33
37 34 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
38 35
39 36 COMMANDS = wireprototypes.commanddict()
40 37
41 38 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
42 39 from .hgweb import common as hgwebcommon
43 40
44 41 # URL space looks like: <permissions>/<command>, where <permission> can
45 42 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
46 43
47 44 # Root URL does nothing meaningful... yet.
48 45 if not urlparts:
49 46 res.status = b'200 OK'
50 47 res.headers[b'Content-Type'] = b'text/plain'
51 48 res.setbodybytes(_('HTTP version 2 API handler'))
52 49 return
53 50
54 51 if len(urlparts) == 1:
55 52 res.status = b'404 Not Found'
56 53 res.headers[b'Content-Type'] = b'text/plain'
57 54 res.setbodybytes(_('do not know how to process %s\n') %
58 55 req.dispatchpath)
59 56 return
60 57
61 58 permission, command = urlparts[0:2]
62 59
63 60 if permission not in (b'ro', b'rw'):
64 61 res.status = b'404 Not Found'
65 62 res.headers[b'Content-Type'] = b'text/plain'
66 63 res.setbodybytes(_('unknown permission: %s') % permission)
67 64 return
68 65
69 66 if req.method != 'POST':
70 67 res.status = b'405 Method Not Allowed'
71 68 res.headers[b'Allow'] = b'POST'
72 69 res.setbodybytes(_('commands require POST requests'))
73 70 return
74 71
75 72 # At some point we'll want to use our own API instead of recycling the
76 73 # behavior of version 1 of the wire protocol...
77 74 # TODO return reasonable responses - not responses that overload the
78 75 # HTTP status line message for error reporting.
79 76 try:
80 77 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
81 78 except hgwebcommon.ErrorResponse as e:
82 79 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
83 80 for k, v in e.headers:
84 81 res.headers[k] = v
85 82 res.setbodybytes('permission denied')
86 83 return
87 84
88 85 # We have a special endpoint to reflect the request back at the client.
89 86 if command == b'debugreflect':
90 87 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
91 88 return
92 89
93 90 # Extra commands that we handle that aren't really wire protocol
94 91 # commands. Think extra hard before making this hackery available to
95 92 # extension.
96 93 extracommands = {'multirequest'}
97 94
98 95 if command not in COMMANDS and command not in extracommands:
99 96 res.status = b'404 Not Found'
100 97 res.headers[b'Content-Type'] = b'text/plain'
101 98 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
102 99 return
103 100
104 101 repo = rctx.repo
105 102 ui = repo.ui
106 103
107 104 proto = httpv2protocolhandler(req, ui)
108 105
109 106 if (not COMMANDS.commandavailable(command, proto)
110 107 and command not in extracommands):
111 108 res.status = b'404 Not Found'
112 109 res.headers[b'Content-Type'] = b'text/plain'
113 110 res.setbodybytes(_('invalid wire protocol command: %s') % command)
114 111 return
115 112
116 113 # TODO consider cases where proxies may add additional Accept headers.
117 114 if req.headers.get(b'Accept') != FRAMINGTYPE:
118 115 res.status = b'406 Not Acceptable'
119 116 res.headers[b'Content-Type'] = b'text/plain'
120 117 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
121 118 % FRAMINGTYPE)
122 119 return
123 120
124 121 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
125 122 res.status = b'415 Unsupported Media Type'
126 123 # TODO we should send a response with appropriate media type,
127 124 # since client does Accept it.
128 125 res.headers[b'Content-Type'] = b'text/plain'
129 126 res.setbodybytes(_('client MUST send Content-Type header with '
130 127 'value: %s\n') % FRAMINGTYPE)
131 128 return
132 129
133 130 _processhttpv2request(ui, repo, req, res, permission, command, proto)
134 131
135 132 def _processhttpv2reflectrequest(ui, repo, req, res):
136 133 """Reads unified frame protocol request and dumps out state to client.
137 134
138 135 This special endpoint can be used to help debug the wire protocol.
139 136
140 137 Instead of routing the request through the normal dispatch mechanism,
141 138 we instead read all frames, decode them, and feed them into our state
142 139 tracker. We then dump the log of all that activity back out to the
143 140 client.
144 141 """
145 142 import json
146 143
147 144 # Reflection APIs have a history of being abused, accidentally disclosing
148 145 # sensitive data, etc. So we have a config knob.
149 146 if not ui.configbool('experimental', 'web.api.debugreflect'):
150 147 res.status = b'404 Not Found'
151 148 res.headers[b'Content-Type'] = b'text/plain'
152 149 res.setbodybytes(_('debugreflect service not available'))
153 150 return
154 151
155 152 # We assume we have a unified framing protocol request body.
156 153
157 154 reactor = wireprotoframing.serverreactor()
158 155 states = []
159 156
160 157 while True:
161 158 frame = wireprotoframing.readframe(req.bodyfh)
162 159
163 160 if not frame:
164 161 states.append(b'received: <no frame>')
165 162 break
166 163
167 164 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
168 165 frame.requestid,
169 166 frame.payload))
170 167
171 168 action, meta = reactor.onframerecv(frame)
172 169 states.append(json.dumps((action, meta), sort_keys=True,
173 170 separators=(', ', ': ')))
174 171
175 172 action, meta = reactor.oninputeof()
176 173 meta['action'] = action
177 174 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
178 175
179 176 res.status = b'200 OK'
180 177 res.headers[b'Content-Type'] = b'text/plain'
181 178 res.setbodybytes(b'\n'.join(states))
182 179
183 180 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
184 181 """Post-validation handler for HTTPv2 requests.
185 182
186 183 Called when the HTTP request contains unified frame-based protocol
187 184 frames for evaluation.
188 185 """
189 186 # TODO Some HTTP clients are full duplex and can receive data before
190 187 # the entire request is transmitted. Figure out a way to indicate support
191 188 # for that so we can opt into full duplex mode.
192 189 reactor = wireprotoframing.serverreactor(deferoutput=True)
193 190 seencommand = False
194 191
195 192 outstream = reactor.makeoutputstream()
196 193
197 194 while True:
198 195 frame = wireprotoframing.readframe(req.bodyfh)
199 196 if not frame:
200 197 break
201 198
202 199 action, meta = reactor.onframerecv(frame)
203 200
204 201 if action == 'wantframe':
205 202 # Need more data before we can do anything.
206 203 continue
207 204 elif action == 'runcommand':
208 205 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
209 206 reqcommand, reactor, outstream,
210 207 meta, issubsequent=seencommand)
211 208
212 209 if sentoutput:
213 210 return
214 211
215 212 seencommand = True
216 213
217 214 elif action == 'error':
218 215 # TODO define proper error mechanism.
219 216 res.status = b'200 OK'
220 217 res.headers[b'Content-Type'] = b'text/plain'
221 218 res.setbodybytes(meta['message'] + b'\n')
222 219 return
223 220 else:
224 221 raise error.ProgrammingError(
225 222 'unhandled action from frame processor: %s' % action)
226 223
227 224 action, meta = reactor.oninputeof()
228 225 if action == 'sendframes':
229 226 # We assume we haven't started sending the response yet. If we're
230 227 # wrong, the response type will raise an exception.
231 228 res.status = b'200 OK'
232 229 res.headers[b'Content-Type'] = FRAMINGTYPE
233 230 res.setbodygen(meta['framegen'])
234 231 elif action == 'noop':
235 232 pass
236 233 else:
237 234 raise error.ProgrammingError('unhandled action from frame processor: %s'
238 235 % action)
239 236
240 237 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
241 238 outstream, command, issubsequent):
242 239 """Dispatch a wire protocol command made from HTTPv2 requests.
243 240
244 241 The authenticated permission (``authedperm``) along with the original
245 242 command from the URL (``reqcommand``) are passed in.
246 243 """
247 244 # We already validated that the session has permissions to perform the
248 245 # actions in ``authedperm``. In the unified frame protocol, the canonical
249 246 # command to run is expressed in a frame. However, the URL also requested
250 247 # to run a specific command. We need to be careful that the command we
251 248 # run doesn't have permissions requirements greater than what was granted
252 249 # by ``authedperm``.
253 250 #
254 251 # Our rule for this is we only allow one command per HTTP request and
255 252 # that command must match the command in the URL. However, we make
256 253 # an exception for the ``multirequest`` URL. This URL is allowed to
257 254 # execute multiple commands. We double check permissions of each command
258 255 # as it is invoked to ensure there is no privilege escalation.
259 256 # TODO consider allowing multiple commands to regular command URLs
260 257 # iff each command is the same.
261 258
262 259 proto = httpv2protocolhandler(req, ui, args=command['args'])
263 260
264 261 if reqcommand == b'multirequest':
265 262 if not COMMANDS.commandavailable(command['command'], proto):
266 263 # TODO proper error mechanism
267 264 res.status = b'200 OK'
268 265 res.headers[b'Content-Type'] = b'text/plain'
269 266 res.setbodybytes(_('wire protocol command not available: %s') %
270 267 command['command'])
271 268 return True
272 269
273 270 # TODO don't use assert here, since it may be elided by -O.
274 271 assert authedperm in (b'ro', b'rw')
275 272 wirecommand = COMMANDS[command['command']]
276 273 assert wirecommand.permission in ('push', 'pull')
277 274
278 275 if authedperm == b'ro' and wirecommand.permission != 'pull':
279 276 # TODO proper error mechanism
280 277 res.status = b'403 Forbidden'
281 278 res.headers[b'Content-Type'] = b'text/plain'
282 279 res.setbodybytes(_('insufficient permissions to execute '
283 280 'command: %s') % command['command'])
284 281 return True
285 282
286 283 # TODO should we also call checkperm() here? Maybe not if we're going
287 284 # to overhaul that API. The granted scope from the URL check should
288 285 # be good enough.
289 286
290 287 else:
291 288 # Don't allow multiple commands outside of ``multirequest`` URL.
292 289 if issubsequent:
293 290 # TODO proper error mechanism
294 291 res.status = b'200 OK'
295 292 res.headers[b'Content-Type'] = b'text/plain'
296 293 res.setbodybytes(_('multiple commands cannot be issued to this '
297 294 'URL'))
298 295 return True
299 296
300 297 if reqcommand != command['command']:
301 298 # TODO define proper error mechanism
302 299 res.status = b'200 OK'
303 300 res.headers[b'Content-Type'] = b'text/plain'
304 301 res.setbodybytes(_('command in frame must match command in URL'))
305 302 return True
306 303
307 304 res.status = b'200 OK'
308 305 res.headers[b'Content-Type'] = FRAMINGTYPE
309 306
310 307 try:
311 308 objs = dispatch(repo, proto, command['command'])
312 309
313 310 action, meta = reactor.oncommandresponsereadyobjects(
314 311 outstream, command['requestid'], objs)
315 312
316 313 except error.WireprotoCommandError as e:
317 314 action, meta = reactor.oncommanderror(
318 315 outstream, command['requestid'], e.message, e.messageargs)
319 316
320 317 except Exception as e:
321 318 action, meta = reactor.onservererror(
322 319 outstream, command['requestid'],
323 320 _('exception when invoking command: %s') %
324 321 stringutil.forcebytestr(e))
325 322
326 323 if action == 'sendframes':
327 324 res.setbodygen(meta['framegen'])
328 325 return True
329 326 elif action == 'noop':
330 327 return False
331 328 else:
332 329 raise error.ProgrammingError('unhandled event from reactor: %s' %
333 330 action)
334 331
335 332 def getdispatchrepo(repo, proto, command):
336 333 return repo.filtered('served')
337 334
338 335 def dispatch(repo, proto, command):
339 336 repo = getdispatchrepo(repo, proto, command)
340 337
341 338 func, spec = COMMANDS[command]
342 339 args = proto.getargs(spec)
343 340
344 341 return func(repo, proto, **pycompat.strkwargs(args))
345 342
346 343 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
347 344 class httpv2protocolhandler(object):
348 345 def __init__(self, req, ui, args=None):
349 346 self._req = req
350 347 self._ui = ui
351 348 self._args = args
352 349
353 350 @property
354 351 def name(self):
355 352 return HTTP_WIREPROTO_V2
356 353
357 354 def getargs(self, args):
358 355 # First look for args that were passed but aren't registered on this
359 356 # command.
360 357 extra = set(self._args) - set(args)
361 358 if extra:
362 359 raise error.WireprotoCommandError(
363 360 'unsupported argument to command: %s' %
364 361 ', '.join(sorted(extra)))
365 362
366 363 # And look for required arguments that are missing.
367 364 missing = {a for a in args if args[a]['required']} - set(self._args)
368 365
369 366 if missing:
370 367 raise error.WireprotoCommandError(
371 368 'missing required arguments: %s' % ', '.join(sorted(missing)))
372 369
373 370 # Now derive the arguments to pass to the command, taking into
374 371 # account the arguments specified by the client.
375 372 data = {}
376 373 for k, meta in sorted(args.items()):
377 374 # This argument wasn't passed by the client.
378 375 if k not in self._args:
379 376 data[k] = meta['default']()
380 377 continue
381 378
382 379 v = self._args[k]
383 380
384 381 # Sets may be expressed as lists. Silently normalize.
385 382 if meta['type'] == 'set' and isinstance(v, list):
386 383 v = set(v)
387 384
388 385 # TODO consider more/stronger type validation.
389 386
390 387 data[k] = v
391 388
392 389 return data
393 390
394 391 def getprotocaps(self):
395 392 # Protocol capabilities are currently not implemented for HTTP V2.
396 393 return set()
397 394
398 395 def getpayload(self):
399 396 raise NotImplementedError
400 397
401 398 @contextlib.contextmanager
402 399 def mayberedirectstdio(self):
403 400 raise NotImplementedError
404 401
405 402 def client(self):
406 403 raise NotImplementedError
407 404
408 405 def addcapabilities(self, repo, caps):
409 406 return caps
410 407
411 408 def checkperm(self, perm):
412 409 raise NotImplementedError
413 410
414 411 def httpv2apidescriptor(req, repo):
415 412 proto = httpv2protocolhandler(req, repo.ui)
416 413
417 414 return _capabilitiesv2(repo, proto)
418 415
419 416 def _capabilitiesv2(repo, proto):
420 417 """Obtain the set of capabilities for version 2 transports.
421 418
422 419 These capabilities are distinct from the capabilities for version 1
423 420 transports.
424 421 """
425 422 compression = []
426 423 for engine in wireprototypes.supportedcompengines(repo.ui, util.SERVERROLE):
427 424 compression.append({
428 425 b'name': engine.wireprotosupport().name,
429 426 })
430 427
431 428 caps = {
432 429 'commands': {},
433 430 'compression': compression,
434 431 'framingmediatypes': [FRAMINGTYPE],
435 432 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
436 433 }
437 434
438 435 for command, entry in COMMANDS.items():
439 436 args = {}
440 437
441 438 for arg, meta in entry.args.items():
442 439 args[arg] = {
443 440 # TODO should this be a normalized type using CBOR's
444 441 # terminology?
445 442 b'type': meta['type'],
446 443 b'required': meta['required'],
447 444 }
448 445
449 446 if not meta['required']:
450 447 args[arg][b'default'] = meta['default']()
451 448
452 449 if meta['validvalues']:
453 450 args[arg][b'validvalues'] = meta['validvalues']
454 451
455 452 caps['commands'][command] = {
456 453 'args': args,
457 454 'permissions': [entry.permission],
458 455 }
459 456
460 457 if streamclone.allowservergeneration(repo):
461 458 caps['rawrepoformats'] = sorted(repo.requirements &
462 459 repo.supportedformats)
463 460
464 461 return proto.addcapabilities(repo, caps)
465 462
466 def builddeltarequests(store, nodes, haveparents):
467 """Build a series of revision delta requests against a backend store.
468
469 Returns a list of revision numbers in the order they should be sent
470 and a list of ``irevisiondeltarequest`` instances to be made against
471 the backend store.
472 """
473 # We sort and send nodes in DAG order because this is optimal for
474 # storage emission.
475 # TODO we may want a better storage API here - one where we can throw
476 # a list of nodes and delta preconditions over a figurative wall and
477 # have the storage backend figure it out for us.
478 revs = dagop.linearize({store.rev(n) for n in nodes}, store.parentrevs)
479
480 requests = []
481 seenrevs = set()
482
483 for rev in revs:
484 node = store.node(rev)
485 parentnodes = store.parents(node)
486 parentrevs = [store.rev(n) for n in parentnodes]
487 deltabaserev = store.deltaparent(rev)
488 deltabasenode = store.node(deltabaserev)
489
490 # The choice of whether to send a fulltext revision or a delta and
491 # what delta to send is governed by a few factors.
492 #
493 # To send a delta, we need to ensure the receiver is capable of
494 # decoding it. And that requires the receiver to have the base
495 # revision the delta is against.
496 #
497 # We can only guarantee the receiver has the base revision if
498 # a) we've already sent the revision as part of this group
499 # b) the receiver has indicated they already have the revision.
500 # And the mechanism for "b" is the client indicating they have
501 # parent revisions. So this means we can only send the delta if
502 # it is sent before or it is against a delta and the receiver says
503 # they have a parent.
504
505 # We can send storage delta if it is against a revision we've sent
506 # in this group.
507 if deltabaserev != nullrev and deltabaserev in seenrevs:
508 basenode = deltabasenode
509
510 # We can send storage delta if it is against a parent revision and
511 # the receiver indicates they have the parents.
512 elif (deltabaserev != nullrev and deltabaserev in parentrevs
513 and haveparents):
514 basenode = deltabasenode
515
516 # Otherwise the storage delta isn't appropriate. Fall back to
517 # using another delta, if possible.
518
519 # Use p1 if we've emitted it or receiver says they have it.
520 elif parentrevs[0] != nullrev and (
521 parentrevs[0] in seenrevs or haveparents):
522 basenode = parentnodes[0]
523
524 # Use p2 if we've emitted it or receiver says they have it.
525 elif parentrevs[1] != nullrev and (
526 parentrevs[1] in seenrevs or haveparents):
527 basenode = parentnodes[1]
528
529 # Nothing appropriate to delta against. Send the full revision.
530 else:
531 basenode = nullid
532
533 requests.append(changegroup.revisiondeltarequest(
534 node=node,
535 p1node=parentnodes[0],
536 p2node=parentnodes[1],
537 # Receiver deals with linknode resolution.
538 linknode=nullid,
539 basenode=basenode,
540 ))
541
542 seenrevs.add(rev)
543
544 return revs, requests
545
546 463 def wireprotocommand(name, args=None, permission='push'):
547 464 """Decorator to declare a wire protocol command.
548 465
549 466 ``name`` is the name of the wire protocol command being provided.
550 467
551 468 ``args`` is a dict defining arguments accepted by the command. Keys are
552 469 the argument name. Values are dicts with the following keys:
553 470
554 471 ``type``
555 472 The argument data type. Must be one of the following string
556 473 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
557 474 or ``bool``.
558 475
559 476 ``default``
560 477 A callable returning the default value for this argument. If not
561 478 specified, ``None`` will be the default value.
562 479
563 480 ``required``
564 481 Bool indicating whether the argument is required.
565 482
566 483 ``example``
567 484 An example value for this argument.
568 485
569 486 ``validvalues``
570 487 Set of recognized values for this argument.
571 488
572 489 ``permission`` defines the permission type needed to run this command.
573 490 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
574 491 respectively. Default is to assume command requires ``push`` permissions
575 492 because otherwise commands not declaring their permissions could modify
576 493 a repository that is supposed to be read-only.
577 494
578 495 Wire protocol commands are generators of objects to be serialized and
579 496 sent to the client.
580 497
581 498 If a command raises an uncaught exception, this will be translated into
582 499 a command error.
583 500 """
584 501 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
585 502 if v['version'] == 2}
586 503
587 504 if permission not in ('push', 'pull'):
588 505 raise error.ProgrammingError('invalid wire protocol permission; '
589 506 'got %s; expected "push" or "pull"' %
590 507 permission)
591 508
592 509 if args is None:
593 510 args = {}
594 511
595 512 if not isinstance(args, dict):
596 513 raise error.ProgrammingError('arguments for version 2 commands '
597 514 'must be declared as dicts')
598 515
599 516 for arg, meta in args.items():
600 517 if arg == '*':
601 518 raise error.ProgrammingError('* argument name not allowed on '
602 519 'version 2 commands')
603 520
604 521 if not isinstance(meta, dict):
605 522 raise error.ProgrammingError('arguments for version 2 commands '
606 523 'must declare metadata as a dict')
607 524
608 525 if 'type' not in meta:
609 526 raise error.ProgrammingError('%s argument for command %s does not '
610 527 'declare type field' % (arg, name))
611 528
612 529 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
613 530 raise error.ProgrammingError('%s argument for command %s has '
614 531 'illegal type: %s' % (arg, name,
615 532 meta['type']))
616 533
617 534 if 'example' not in meta:
618 535 raise error.ProgrammingError('%s argument for command %s does not '
619 536 'declare example field' % (arg, name))
620 537
621 538 if 'default' in meta and meta.get('required'):
622 539 raise error.ProgrammingError('%s argument for command %s is marked '
623 540 'as required but has a default value' %
624 541 (arg, name))
625 542
626 543 meta.setdefault('default', lambda: None)
627 544 meta.setdefault('required', False)
628 545 meta.setdefault('validvalues', None)
629 546
630 547 def register(func):
631 548 if name in COMMANDS:
632 549 raise error.ProgrammingError('%s command already registered '
633 550 'for version 2' % name)
634 551
635 552 COMMANDS[name] = wireprototypes.commandentry(
636 553 func, args=args, transports=transports, permission=permission)
637 554
638 555 return func
639 556
640 557 return register
641 558
642 559 @wireprotocommand('branchmap', permission='pull')
643 560 def branchmapv2(repo, proto):
644 561 yield {encoding.fromlocal(k): v
645 562 for k, v in repo.branchmap().iteritems()}
646 563
647 564 @wireprotocommand('capabilities', permission='pull')
648 565 def capabilitiesv2(repo, proto):
649 566 yield _capabilitiesv2(repo, proto)
650 567
651 568 @wireprotocommand(
652 569 'changesetdata',
653 570 args={
654 571 'noderange': {
655 572 'type': 'list',
656 573 'example': [[b'0123456...'], [b'abcdef...']],
657 574 },
658 575 'nodes': {
659 576 'type': 'list',
660 577 'example': [b'0123456...'],
661 578 },
662 579 'nodesdepth': {
663 580 'type': 'int',
664 581 'example': 10,
665 582 },
666 583 'fields': {
667 584 'type': 'set',
668 585 'default': set,
669 586 'example': {b'parents', b'revision'},
670 587 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
671 588 },
672 589 },
673 590 permission='pull')
674 591 def changesetdata(repo, proto, noderange, nodes, nodesdepth, fields):
675 592 # TODO look for unknown fields and abort when they can't be serviced.
676 593 # This could probably be validated by dispatcher using validvalues.
677 594
678 595 if noderange is None and nodes is None:
679 596 raise error.WireprotoCommandError(
680 597 'noderange or nodes must be defined')
681 598
682 599 if nodesdepth is not None and nodes is None:
683 600 raise error.WireprotoCommandError(
684 601 'nodesdepth requires the nodes argument')
685 602
686 603 if noderange is not None:
687 604 if len(noderange) != 2:
688 605 raise error.WireprotoCommandError(
689 606 'noderange must consist of 2 elements')
690 607
691 608 if not noderange[1]:
692 609 raise error.WireprotoCommandError(
693 610 'heads in noderange request cannot be empty')
694 611
695 612 cl = repo.changelog
696 613 hasnode = cl.hasnode
697 614
698 615 seen = set()
699 616 outgoing = []
700 617
701 618 if nodes is not None:
702 619 outgoing = [n for n in nodes if hasnode(n)]
703 620
704 621 if nodesdepth:
705 622 outgoing = [cl.node(r) for r in
706 623 repo.revs(b'ancestors(%ln, %d)', outgoing,
707 624 nodesdepth - 1)]
708 625
709 626 seen |= set(outgoing)
710 627
711 628 if noderange is not None:
712 629 if noderange[0]:
713 630 common = [n for n in noderange[0] if hasnode(n)]
714 631 else:
715 632 common = [nullid]
716 633
717 634 for n in discovery.outgoing(repo, common, noderange[1]).missing:
718 635 if n not in seen:
719 636 outgoing.append(n)
720 637 # Don't need to add to seen here because this is the final
721 638 # source of nodes and there should be no duplicates in this
722 639 # list.
723 640
724 641 seen.clear()
725 642 publishing = repo.publishing()
726 643
727 644 if outgoing:
728 645 repo.hook('preoutgoing', throw=True, source='serve')
729 646
730 647 yield {
731 648 b'totalitems': len(outgoing),
732 649 }
733 650
734 651 # The phases of nodes already transferred to the client may have changed
735 652 # since the client last requested data. We send phase-only records
736 653 # for these revisions, if requested.
737 654 if b'phase' in fields and noderange is not None:
738 655 # TODO skip nodes whose phase will be reflected by a node in the
739 656 # outgoing set. This is purely an optimization to reduce data
740 657 # size.
741 658 for node in noderange[0]:
742 659 yield {
743 660 b'node': node,
744 661 b'phase': b'public' if publishing else repo[node].phasestr()
745 662 }
746 663
747 664 nodebookmarks = {}
748 665 for mark, node in repo._bookmarks.items():
749 666 nodebookmarks.setdefault(node, set()).add(mark)
750 667
751 668 # It is already topologically sorted by revision number.
752 669 for node in outgoing:
753 670 d = {
754 671 b'node': node,
755 672 }
756 673
757 674 if b'parents' in fields:
758 675 d[b'parents'] = cl.parents(node)
759 676
760 677 if b'phase' in fields:
761 678 if publishing:
762 679 d[b'phase'] = b'public'
763 680 else:
764 681 ctx = repo[node]
765 682 d[b'phase'] = ctx.phasestr()
766 683
767 684 if b'bookmarks' in fields and node in nodebookmarks:
768 685 d[b'bookmarks'] = sorted(nodebookmarks[node])
769 686 del nodebookmarks[node]
770 687
771 688 followingmeta = []
772 689 followingdata = []
773 690
774 691 if b'revision' in fields:
775 692 revisiondata = cl.revision(node, raw=True)
776 693 followingmeta.append((b'revision', len(revisiondata)))
777 694 followingdata.append(revisiondata)
778 695
779 696 # TODO make it possible for extensions to wrap a function or register
780 697 # a handler to service custom fields.
781 698
782 699 if followingmeta:
783 700 d[b'fieldsfollowing'] = followingmeta
784 701
785 702 yield d
786 703
787 704 for extra in followingdata:
788 705 yield extra
789 706
790 707 # If requested, send bookmarks from nodes that didn't have revision
791 708 # data sent so receiver is aware of any bookmark updates.
792 709 if b'bookmarks' in fields:
793 710 for node, marks in sorted(nodebookmarks.iteritems()):
794 711 yield {
795 712 b'node': node,
796 713 b'bookmarks': sorted(marks),
797 714 }
798 715
799 716 class FileAccessError(Exception):
800 717 """Represents an error accessing a specific file."""
801 718
802 719 def __init__(self, path, msg, args):
803 720 self.path = path
804 721 self.msg = msg
805 722 self.args = args
806 723
807 724 def getfilestore(repo, proto, path):
808 725 """Obtain a file storage object for use with wire protocol.
809 726
810 727 Exists as a standalone function so extensions can monkeypatch to add
811 728 access control.
812 729 """
813 730 # This seems to work even if the file doesn't exist. So catch
814 731 # "empty" files and return an error.
815 732 fl = repo.file(path)
816 733
817 734 if not len(fl):
818 735 raise FileAccessError(path, 'unknown file: %s', (path,))
819 736
820 737 return fl
821 738
822 739 @wireprotocommand(
823 740 'filedata',
824 741 args={
825 742 'haveparents': {
826 743 'type': 'bool',
827 744 'default': lambda: False,
828 745 'example': True,
829 746 },
830 747 'nodes': {
831 748 'type': 'list',
832 749 'required': True,
833 750 'example': [b'0123456...'],
834 751 },
835 752 'fields': {
836 753 'type': 'set',
837 754 'default': set,
838 755 'example': {b'parents', b'revision'},
839 756 'validvalues': {b'parents', b'revision'},
840 757 },
841 758 'path': {
842 759 'type': 'bytes',
843 760 'required': True,
844 761 'example': b'foo.txt',
845 762 }
846 763 },
847 764 permission='pull')
848 765 def filedata(repo, proto, haveparents, nodes, fields, path):
849 766 try:
850 767 # Extensions may wish to access the protocol handler.
851 768 store = getfilestore(repo, proto, path)
852 769 except FileAccessError as e:
853 770 raise error.WireprotoCommandError(e.msg, e.args)
854 771
855 772 # Validate requested nodes.
856 773 for node in nodes:
857 774 try:
858 775 store.rev(node)
859 776 except error.LookupError:
860 777 raise error.WireprotoCommandError('unknown file node: %s',
861 778 (hex(node),))
862 779
863 revs, requests = builddeltarequests(store, nodes, haveparents)
780 revisions = store.emitrevisions(nodes,
781 revisiondata=b'revision' in fields,
782 assumehaveparentrevisions=haveparents)
864 783
865 784 yield {
866 b'totalitems': len(revs),
785 b'totalitems': len(nodes),
867 786 }
868 787
869 if b'revision' in fields:
870 deltas = store.emitrevisiondeltas(requests)
871 else:
872 deltas = None
873
874 for rev in revs:
875 node = store.node(rev)
876
877 if deltas is not None:
878 delta = next(deltas)
879 else:
880 delta = None
881
788 for revision in revisions:
882 789 d = {
883 b'node': node,
790 b'node': revision.node,
884 791 }
885 792
886 793 if b'parents' in fields:
887 d[b'parents'] = store.parents(node)
794 d[b'parents'] = [revision.p1node, revision.p2node]
888 795
889 796 followingmeta = []
890 797 followingdata = []
891 798
892 799 if b'revision' in fields:
893 assert delta is not None
894 assert delta.flags == 0
895 assert d[b'node'] == delta.node
896
897 if delta.revision is not None:
898 followingmeta.append((b'revision', len(delta.revision)))
899 followingdata.append(delta.revision)
800 if revision.revision is not None:
801 followingmeta.append((b'revision', len(revision.revision)))
802 followingdata.append(revision.revision)
900 803 else:
901 d[b'deltabasenode'] = delta.basenode
902 followingmeta.append((b'delta', len(delta.delta)))
903 followingdata.append(delta.delta)
804 d[b'deltabasenode'] = revision.basenode
805 followingmeta.append((b'delta', len(revision.delta)))
806 followingdata.append(revision.delta)
904 807
905 808 if followingmeta:
906 809 d[b'fieldsfollowing'] = followingmeta
907 810
908 811 yield d
909 812
910 813 for extra in followingdata:
911 814 yield extra
912 815
913 if deltas is not None:
914 try:
915 next(deltas)
916 raise error.ProgrammingError('should not have more deltas')
917 except GeneratorExit:
918 pass
919
920 816 @wireprotocommand(
921 817 'heads',
922 818 args={
923 819 'publiconly': {
924 820 'type': 'bool',
925 821 'default': lambda: False,
926 822 'example': False,
927 823 },
928 824 },
929 825 permission='pull')
930 826 def headsv2(repo, proto, publiconly):
931 827 if publiconly:
932 828 repo = repo.filtered('immutable')
933 829
934 830 yield repo.heads()
935 831
936 832 @wireprotocommand(
937 833 'known',
938 834 args={
939 835 'nodes': {
940 836 'type': 'list',
941 837 'default': list,
942 838 'example': [b'deadbeef'],
943 839 },
944 840 },
945 841 permission='pull')
946 842 def knownv2(repo, proto, nodes):
947 843 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
948 844 yield result
949 845
950 846 @wireprotocommand(
951 847 'listkeys',
952 848 args={
953 849 'namespace': {
954 850 'type': 'bytes',
955 851 'required': True,
956 852 'example': b'ns',
957 853 },
958 854 },
959 855 permission='pull')
960 856 def listkeysv2(repo, proto, namespace):
961 857 keys = repo.listkeys(encoding.tolocal(namespace))
962 858 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
963 859 for k, v in keys.iteritems()}
964 860
965 861 yield keys
966 862
967 863 @wireprotocommand(
968 864 'lookup',
969 865 args={
970 866 'key': {
971 867 'type': 'bytes',
972 868 'required': True,
973 869 'example': b'foo',
974 870 },
975 871 },
976 872 permission='pull')
977 873 def lookupv2(repo, proto, key):
978 874 key = encoding.tolocal(key)
979 875
980 876 # TODO handle exception.
981 877 node = repo.lookup(key)
982 878
983 879 yield node
984 880
985 881 @wireprotocommand(
986 882 'manifestdata',
987 883 args={
988 884 'nodes': {
989 885 'type': 'list',
990 886 'required': True,
991 887 'example': [b'0123456...'],
992 888 },
993 889 'haveparents': {
994 890 'type': 'bool',
995 891 'default': lambda: False,
996 892 'example': True,
997 893 },
998 894 'fields': {
999 895 'type': 'set',
1000 896 'default': set,
1001 897 'example': {b'parents', b'revision'},
1002 898 'validvalues': {b'parents', b'revision'},
1003 899 },
1004 900 'tree': {
1005 901 'type': 'bytes',
1006 902 'required': True,
1007 903 'example': b'',
1008 904 },
1009 905 },
1010 906 permission='pull')
1011 907 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1012 908 store = repo.manifestlog.getstorage(tree)
1013 909
1014 910 # Validate the node is known and abort on unknown revisions.
1015 911 for node in nodes:
1016 912 try:
1017 913 store.rev(node)
1018 914 except error.LookupError:
1019 915 raise error.WireprotoCommandError(
1020 916 'unknown node: %s', (node,))
1021 917
1022 revs, requests = builddeltarequests(store, nodes, haveparents)
918 revisions = store.emitrevisions(nodes,
919 revisiondata=b'revision' in fields,
920 assumehaveparentrevisions=haveparents)
1023 921
1024 922 yield {
1025 b'totalitems': len(revs),
923 b'totalitems': len(nodes),
1026 924 }
1027 925
1028 if b'revision' in fields:
1029 deltas = store.emitrevisiondeltas(requests)
1030 else:
1031 deltas = None
1032
1033 for rev in revs:
1034 node = store.node(rev)
1035
1036 if deltas is not None:
1037 delta = next(deltas)
1038 else:
1039 delta = None
1040
926 for revision in revisions:
1041 927 d = {
1042 b'node': node,
928 b'node': revision.node,
1043 929 }
1044 930
1045 931 if b'parents' in fields:
1046 d[b'parents'] = store.parents(node)
932 d[b'parents'] = [revision.p1node, revision.p2node]
1047 933
1048 934 followingmeta = []
1049 935 followingdata = []
1050 936
1051 937 if b'revision' in fields:
1052 assert delta is not None
1053 assert delta.flags == 0
1054 assert d[b'node'] == delta.node
1055
1056 if delta.revision is not None:
1057 followingmeta.append((b'revision', len(delta.revision)))
1058 followingdata.append(delta.revision)
938 if revision.revision is not None:
939 followingmeta.append((b'revision', len(revision.revision)))
940 followingdata.append(revision.revision)
1059 941 else:
1060 d[b'deltabasenode'] = delta.basenode
1061 followingmeta.append((b'delta', len(delta.delta)))
1062 followingdata.append(delta.delta)
942 d[b'deltabasenode'] = revision.basenode
943 followingmeta.append((b'delta', len(revision.delta)))
944 followingdata.append(revision.delta)
1063 945
1064 946 if followingmeta:
1065 947 d[b'fieldsfollowing'] = followingmeta
1066 948
1067 949 yield d
1068 950
1069 951 for extra in followingdata:
1070 952 yield extra
1071 953
1072 if deltas is not None:
1073 try:
1074 next(deltas)
1075 raise error.ProgrammingError('should not have more deltas')
1076 except GeneratorExit:
1077 pass
1078
1079 954 @wireprotocommand(
1080 955 'pushkey',
1081 956 args={
1082 957 'namespace': {
1083 958 'type': 'bytes',
1084 959 'required': True,
1085 960 'example': b'ns',
1086 961 },
1087 962 'key': {
1088 963 'type': 'bytes',
1089 964 'required': True,
1090 965 'example': b'key',
1091 966 },
1092 967 'old': {
1093 968 'type': 'bytes',
1094 969 'required': True,
1095 970 'example': b'old',
1096 971 },
1097 972 'new': {
1098 973 'type': 'bytes',
1099 974 'required': True,
1100 975 'example': 'new',
1101 976 },
1102 977 },
1103 978 permission='push')
1104 979 def pushkeyv2(repo, proto, namespace, key, old, new):
1105 980 # TODO handle ui output redirection
1106 981 yield repo.pushkey(encoding.tolocal(namespace),
1107 982 encoding.tolocal(key),
1108 983 encoding.tolocal(old),
1109 984 encoding.tolocal(new))
General Comments 0
You need to be logged in to leave comments. Login now