##// END OF EJS Templates
wireprotoserver: convert ErrorResponse to bytes...
Matt Harbison -
r47524:71443f74 stable
parent child Browse files
Show More
@@ -1,856 +1,859 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import struct
11 11 import threading
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 encoding,
16 16 error,
17 17 pycompat,
18 18 util,
19 19 wireprototypes,
20 20 wireprotov1server,
21 21 wireprotov2server,
22 22 )
23 23 from .interfaces import util as interfaceutil
24 24 from .utils import (
25 25 cborutil,
26 26 compression,
27 stringutil,
27 28 )
28 29
29 30 stringio = util.stringio
30 31
31 32 urlerr = util.urlerr
32 33 urlreq = util.urlreq
33 34
34 35 HTTP_OK = 200
35 36
36 37 HGTYPE = b'application/mercurial-0.1'
37 38 HGTYPE2 = b'application/mercurial-0.2'
38 39 HGERRTYPE = b'application/hg-error'
39 40
40 41 SSHV1 = wireprototypes.SSHV1
41 42 SSHV2 = wireprototypes.SSHV2
42 43
43 44
44 45 def decodevaluefromheaders(req, headerprefix):
45 46 """Decode a long value from multiple HTTP request headers.
46 47
47 48 Returns the value as a bytes, not a str.
48 49 """
49 50 chunks = []
50 51 i = 1
51 52 while True:
52 53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
53 54 if v is None:
54 55 break
55 56 chunks.append(pycompat.bytesurl(v))
56 57 i += 1
57 58
58 59 return b''.join(chunks)
59 60
60 61
61 62 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
62 63 class httpv1protocolhandler(object):
63 64 def __init__(self, req, ui, checkperm):
64 65 self._req = req
65 66 self._ui = ui
66 67 self._checkperm = checkperm
67 68 self._protocaps = None
68 69
69 70 @property
70 71 def name(self):
71 72 return b'http-v1'
72 73
73 74 def getargs(self, args):
74 75 knownargs = self._args()
75 76 data = {}
76 77 keys = args.split()
77 78 for k in keys:
78 79 if k == b'*':
79 80 star = {}
80 81 for key in knownargs.keys():
81 82 if key != b'cmd' and key not in keys:
82 83 star[key] = knownargs[key][0]
83 84 data[b'*'] = star
84 85 else:
85 86 data[k] = knownargs[k][0]
86 87 return [data[k] for k in keys]
87 88
88 89 def _args(self):
89 90 args = self._req.qsparams.asdictoflists()
90 91 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
91 92 if postlen:
92 93 args.update(
93 94 urlreq.parseqs(
94 95 self._req.bodyfh.read(postlen), keep_blank_values=True
95 96 )
96 97 )
97 98 return args
98 99
99 100 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
100 101 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
101 102 return args
102 103
103 104 def getprotocaps(self):
104 105 if self._protocaps is None:
105 106 value = decodevaluefromheaders(self._req, b'X-HgProto')
106 107 self._protocaps = set(value.split(b' '))
107 108 return self._protocaps
108 109
109 110 def getpayload(self):
110 111 # Existing clients *always* send Content-Length.
111 112 length = int(self._req.headers[b'Content-Length'])
112 113
113 114 # If httppostargs is used, we need to read Content-Length
114 115 # minus the amount that was consumed by args.
115 116 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
116 117 return util.filechunkiter(self._req.bodyfh, limit=length)
117 118
118 119 @contextlib.contextmanager
119 120 def mayberedirectstdio(self):
120 121 oldout = self._ui.fout
121 122 olderr = self._ui.ferr
122 123
123 124 out = util.stringio()
124 125
125 126 try:
126 127 self._ui.fout = out
127 128 self._ui.ferr = out
128 129 yield out
129 130 finally:
130 131 self._ui.fout = oldout
131 132 self._ui.ferr = olderr
132 133
133 134 def client(self):
134 135 return b'remote:%s:%s:%s' % (
135 136 self._req.urlscheme,
136 137 urlreq.quote(self._req.remotehost or b''),
137 138 urlreq.quote(self._req.remoteuser or b''),
138 139 )
139 140
140 141 def addcapabilities(self, repo, caps):
141 142 caps.append(b'batch')
142 143
143 144 caps.append(
144 145 b'httpheader=%d' % repo.ui.configint(b'server', b'maxhttpheaderlen')
145 146 )
146 147 if repo.ui.configbool(b'experimental', b'httppostargs'):
147 148 caps.append(b'httppostargs')
148 149
149 150 # FUTURE advertise 0.2rx once support is implemented
150 151 # FUTURE advertise minrx and mintx after consulting config option
151 152 caps.append(b'httpmediatype=0.1rx,0.1tx,0.2tx')
152 153
153 154 compengines = wireprototypes.supportedcompengines(
154 155 repo.ui, compression.SERVERROLE
155 156 )
156 157 if compengines:
157 158 comptypes = b','.join(
158 159 urlreq.quote(e.wireprotosupport().name) for e in compengines
159 160 )
160 161 caps.append(b'compression=%s' % comptypes)
161 162
162 163 return caps
163 164
164 165 def checkperm(self, perm):
165 166 return self._checkperm(perm)
166 167
167 168
168 169 # This method exists mostly so that extensions like remotefilelog can
169 170 # disable a kludgey legacy method only over http. As of early 2018,
170 171 # there are no other known users, so with any luck we can discard this
171 172 # hook if remotefilelog becomes a first-party extension.
172 173 def iscmd(cmd):
173 174 return cmd in wireprotov1server.commands
174 175
175 176
176 177 def handlewsgirequest(rctx, req, res, checkperm):
177 178 """Possibly process a wire protocol request.
178 179
179 180 If the current request is a wire protocol request, the request is
180 181 processed by this function.
181 182
182 183 ``req`` is a ``parsedrequest`` instance.
183 184 ``res`` is a ``wsgiresponse`` instance.
184 185
185 186 Returns a bool indicating if the request was serviced. If set, the caller
186 187 should stop processing the request, as a response has already been issued.
187 188 """
188 189 # Avoid cycle involving hg module.
189 190 from .hgweb import common as hgwebcommon
190 191
191 192 repo = rctx.repo
192 193
193 194 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
194 195 # string parameter. If it isn't present, this isn't a wire protocol
195 196 # request.
196 197 if b'cmd' not in req.qsparams:
197 198 return False
198 199
199 200 cmd = req.qsparams[b'cmd']
200 201
201 202 # The "cmd" request parameter is used by both the wire protocol and hgweb.
202 203 # While not all wire protocol commands are available for all transports,
203 204 # if we see a "cmd" value that resembles a known wire protocol command, we
204 205 # route it to a protocol handler. This is better than routing possible
205 206 # wire protocol requests to hgweb because it prevents hgweb from using
206 207 # known wire protocol commands and it is less confusing for machine
207 208 # clients.
208 209 if not iscmd(cmd):
209 210 return False
210 211
211 212 # The "cmd" query string argument is only valid on the root path of the
212 213 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
213 214 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
214 215 # in this case. We send an HTTP 404 for backwards compatibility reasons.
215 216 if req.dispatchpath:
216 217 res.status = hgwebcommon.statusmessage(404)
217 218 res.headers[b'Content-Type'] = HGTYPE
218 219 # TODO This is not a good response to issue for this request. This
219 220 # is mostly for BC for now.
220 221 res.setbodybytes(b'0\n%s\n' % b'Not Found')
221 222 return True
222 223
223 224 proto = httpv1protocolhandler(
224 225 req, repo.ui, lambda perm: checkperm(rctx, req, perm)
225 226 )
226 227
227 228 # The permissions checker should be the only thing that can raise an
228 229 # ErrorResponse. It is kind of a layer violation to catch an hgweb
229 230 # exception here. So consider refactoring into a exception type that
230 231 # is associated with the wire protocol.
231 232 try:
232 233 _callhttp(repo, req, res, proto, cmd)
233 234 except hgwebcommon.ErrorResponse as e:
234 235 for k, v in e.headers:
235 236 res.headers[k] = v
236 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
237 res.status = hgwebcommon.statusmessage(
238 e.code, stringutil.forcebytestr(e)
239 )
237 240 # TODO This response body assumes the failed command was
238 241 # "unbundle." That assumption is not always valid.
239 res.setbodybytes(b'0\n%s\n' % pycompat.bytestr(e))
242 res.setbodybytes(b'0\n%s\n' % stringutil.forcebytestr(e))
240 243
241 244 return True
242 245
243 246
244 247 def _availableapis(repo):
245 248 apis = set()
246 249
247 250 # Registered APIs are made available via config options of the name of
248 251 # the protocol.
249 252 for k, v in API_HANDLERS.items():
250 253 section, option = v[b'config']
251 254 if repo.ui.configbool(section, option):
252 255 apis.add(k)
253 256
254 257 return apis
255 258
256 259
257 260 def handlewsgiapirequest(rctx, req, res, checkperm):
258 261 """Handle requests to /api/*."""
259 262 assert req.dispatchparts[0] == b'api'
260 263
261 264 repo = rctx.repo
262 265
263 266 # This whole URL space is experimental for now. But we want to
264 267 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
265 268 if not repo.ui.configbool(b'experimental', b'web.apiserver'):
266 269 res.status = b'404 Not Found'
267 270 res.headers[b'Content-Type'] = b'text/plain'
268 271 res.setbodybytes(_(b'Experimental API server endpoint not enabled'))
269 272 return
270 273
271 274 # The URL space is /api/<protocol>/*. The structure of URLs under varies
272 275 # by <protocol>.
273 276
274 277 availableapis = _availableapis(repo)
275 278
276 279 # Requests to /api/ list available APIs.
277 280 if req.dispatchparts == [b'api']:
278 281 res.status = b'200 OK'
279 282 res.headers[b'Content-Type'] = b'text/plain'
280 283 lines = [
281 284 _(
282 285 b'APIs can be accessed at /api/<name>, where <name> can be '
283 286 b'one of the following:\n'
284 287 )
285 288 ]
286 289 if availableapis:
287 290 lines.extend(sorted(availableapis))
288 291 else:
289 292 lines.append(_(b'(no available APIs)\n'))
290 293 res.setbodybytes(b'\n'.join(lines))
291 294 return
292 295
293 296 proto = req.dispatchparts[1]
294 297
295 298 if proto not in API_HANDLERS:
296 299 res.status = b'404 Not Found'
297 300 res.headers[b'Content-Type'] = b'text/plain'
298 301 res.setbodybytes(
299 302 _(b'Unknown API: %s\nKnown APIs: %s')
300 303 % (proto, b', '.join(sorted(availableapis)))
301 304 )
302 305 return
303 306
304 307 if proto not in availableapis:
305 308 res.status = b'404 Not Found'
306 309 res.headers[b'Content-Type'] = b'text/plain'
307 310 res.setbodybytes(_(b'API %s not enabled\n') % proto)
308 311 return
309 312
310 313 API_HANDLERS[proto][b'handler'](
311 314 rctx, req, res, checkperm, req.dispatchparts[2:]
312 315 )
313 316
314 317
315 318 # Maps API name to metadata so custom API can be registered.
316 319 # Keys are:
317 320 #
318 321 # config
319 322 # Config option that controls whether service is enabled.
320 323 # handler
321 324 # Callable receiving (rctx, req, res, checkperm, urlparts) that is called
322 325 # when a request to this API is received.
323 326 # apidescriptor
324 327 # Callable receiving (req, repo) that is called to obtain an API
325 328 # descriptor for this service. The response must be serializable to CBOR.
326 329 API_HANDLERS = {
327 330 wireprotov2server.HTTP_WIREPROTO_V2: {
328 331 b'config': (b'experimental', b'web.api.http-v2'),
329 332 b'handler': wireprotov2server.handlehttpv2request,
330 333 b'apidescriptor': wireprotov2server.httpv2apidescriptor,
331 334 },
332 335 }
333 336
334 337
335 338 def _httpresponsetype(ui, proto, prefer_uncompressed):
336 339 """Determine the appropriate response type and compression settings.
337 340
338 341 Returns a tuple of (mediatype, compengine, engineopts).
339 342 """
340 343 # Determine the response media type and compression engine based
341 344 # on the request parameters.
342 345
343 346 if b'0.2' in proto.getprotocaps():
344 347 # All clients are expected to support uncompressed data.
345 348 if prefer_uncompressed:
346 349 return HGTYPE2, compression._noopengine(), {}
347 350
348 351 # Now find an agreed upon compression format.
349 352 compformats = wireprotov1server.clientcompressionsupport(proto)
350 353 for engine in wireprototypes.supportedcompengines(
351 354 ui, compression.SERVERROLE
352 355 ):
353 356 if engine.wireprotosupport().name in compformats:
354 357 opts = {}
355 358 level = ui.configint(b'server', b'%slevel' % engine.name())
356 359 if level is not None:
357 360 opts[b'level'] = level
358 361
359 362 return HGTYPE2, engine, opts
360 363
361 364 # No mutually supported compression format. Fall back to the
362 365 # legacy protocol.
363 366
364 367 # Don't allow untrusted settings because disabling compression or
365 368 # setting a very high compression level could lead to flooding
366 369 # the server's network or CPU.
367 370 opts = {b'level': ui.configint(b'server', b'zliblevel')}
368 371 return HGTYPE, util.compengines[b'zlib'], opts
369 372
370 373
371 374 def processcapabilitieshandshake(repo, req, res, proto):
372 375 """Called during a ?cmd=capabilities request.
373 376
374 377 If the client is advertising support for a newer protocol, we send
375 378 a CBOR response with information about available services. If no
376 379 advertised services are available, we don't handle the request.
377 380 """
378 381 # Fall back to old behavior unless the API server is enabled.
379 382 if not repo.ui.configbool(b'experimental', b'web.apiserver'):
380 383 return False
381 384
382 385 clientapis = decodevaluefromheaders(req, b'X-HgUpgrade')
383 386 protocaps = decodevaluefromheaders(req, b'X-HgProto')
384 387 if not clientapis or not protocaps:
385 388 return False
386 389
387 390 # We currently only support CBOR responses.
388 391 protocaps = set(protocaps.split(b' '))
389 392 if b'cbor' not in protocaps:
390 393 return False
391 394
392 395 descriptors = {}
393 396
394 397 for api in sorted(set(clientapis.split()) & _availableapis(repo)):
395 398 handler = API_HANDLERS[api]
396 399
397 400 descriptorfn = handler.get(b'apidescriptor')
398 401 if not descriptorfn:
399 402 continue
400 403
401 404 descriptors[api] = descriptorfn(req, repo)
402 405
403 406 v1caps = wireprotov1server.dispatch(repo, proto, b'capabilities')
404 407 assert isinstance(v1caps, wireprototypes.bytesresponse)
405 408
406 409 m = {
407 410 # TODO allow this to be configurable.
408 411 b'apibase': b'api/',
409 412 b'apis': descriptors,
410 413 b'v1capabilities': v1caps.data,
411 414 }
412 415
413 416 res.status = b'200 OK'
414 417 res.headers[b'Content-Type'] = b'application/mercurial-cbor'
415 418 res.setbodybytes(b''.join(cborutil.streamencode(m)))
416 419
417 420 return True
418 421
419 422
420 423 def _callhttp(repo, req, res, proto, cmd):
421 424 # Avoid cycle involving hg module.
422 425 from .hgweb import common as hgwebcommon
423 426
424 427 def genversion2(gen, engine, engineopts):
425 428 # application/mercurial-0.2 always sends a payload header
426 429 # identifying the compression engine.
427 430 name = engine.wireprotosupport().name
428 431 assert 0 < len(name) < 256
429 432 yield struct.pack(b'B', len(name))
430 433 yield name
431 434
432 435 for chunk in gen:
433 436 yield chunk
434 437
435 438 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
436 439 if code == HTTP_OK:
437 440 res.status = b'200 Script output follows'
438 441 else:
439 442 res.status = hgwebcommon.statusmessage(code)
440 443
441 444 res.headers[b'Content-Type'] = contenttype
442 445
443 446 if bodybytes is not None:
444 447 res.setbodybytes(bodybytes)
445 448 if bodygen is not None:
446 449 res.setbodygen(bodygen)
447 450
448 451 if not wireprotov1server.commands.commandavailable(cmd, proto):
449 452 setresponse(
450 453 HTTP_OK,
451 454 HGERRTYPE,
452 455 _(
453 456 b'requested wire protocol command is not available over '
454 457 b'HTTP'
455 458 ),
456 459 )
457 460 return
458 461
459 462 proto.checkperm(wireprotov1server.commands[cmd].permission)
460 463
461 464 # Possibly handle a modern client wanting to switch protocols.
462 465 if cmd == b'capabilities' and processcapabilitieshandshake(
463 466 repo, req, res, proto
464 467 ):
465 468
466 469 return
467 470
468 471 rsp = wireprotov1server.dispatch(repo, proto, cmd)
469 472
470 473 if isinstance(rsp, bytes):
471 474 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
472 475 elif isinstance(rsp, wireprototypes.bytesresponse):
473 476 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
474 477 elif isinstance(rsp, wireprototypes.streamreslegacy):
475 478 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
476 479 elif isinstance(rsp, wireprototypes.streamres):
477 480 gen = rsp.gen
478 481
479 482 # This code for compression should not be streamres specific. It
480 483 # is here because we only compress streamres at the moment.
481 484 mediatype, engine, engineopts = _httpresponsetype(
482 485 repo.ui, proto, rsp.prefer_uncompressed
483 486 )
484 487 gen = engine.compressstream(gen, engineopts)
485 488
486 489 if mediatype == HGTYPE2:
487 490 gen = genversion2(gen, engine, engineopts)
488 491
489 492 setresponse(HTTP_OK, mediatype, bodygen=gen)
490 493 elif isinstance(rsp, wireprototypes.pushres):
491 494 rsp = b'%d\n%s' % (rsp.res, rsp.output)
492 495 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
493 496 elif isinstance(rsp, wireprototypes.pusherr):
494 497 rsp = b'0\n%s\n' % rsp.res
495 498 res.drain = True
496 499 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
497 500 elif isinstance(rsp, wireprototypes.ooberror):
498 501 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
499 502 else:
500 503 raise error.ProgrammingError(b'hgweb.protocol internal failure', rsp)
501 504
502 505
503 506 def _sshv1respondbytes(fout, value):
504 507 """Send a bytes response for protocol version 1."""
505 508 fout.write(b'%d\n' % len(value))
506 509 fout.write(value)
507 510 fout.flush()
508 511
509 512
510 513 def _sshv1respondstream(fout, source):
511 514 write = fout.write
512 515 for chunk in source.gen:
513 516 write(chunk)
514 517 fout.flush()
515 518
516 519
517 520 def _sshv1respondooberror(fout, ferr, rsp):
518 521 ferr.write(b'%s\n-\n' % rsp)
519 522 ferr.flush()
520 523 fout.write(b'\n')
521 524 fout.flush()
522 525
523 526
524 527 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
525 528 class sshv1protocolhandler(object):
526 529 """Handler for requests services via version 1 of SSH protocol."""
527 530
528 531 def __init__(self, ui, fin, fout):
529 532 self._ui = ui
530 533 self._fin = fin
531 534 self._fout = fout
532 535 self._protocaps = set()
533 536
534 537 @property
535 538 def name(self):
536 539 return wireprototypes.SSHV1
537 540
538 541 def getargs(self, args):
539 542 data = {}
540 543 keys = args.split()
541 544 for n in pycompat.xrange(len(keys)):
542 545 argline = self._fin.readline()[:-1]
543 546 arg, l = argline.split()
544 547 if arg not in keys:
545 548 raise error.Abort(_(b"unexpected parameter %r") % arg)
546 549 if arg == b'*':
547 550 star = {}
548 551 for k in pycompat.xrange(int(l)):
549 552 argline = self._fin.readline()[:-1]
550 553 arg, l = argline.split()
551 554 val = self._fin.read(int(l))
552 555 star[arg] = val
553 556 data[b'*'] = star
554 557 else:
555 558 val = self._fin.read(int(l))
556 559 data[arg] = val
557 560 return [data[k] for k in keys]
558 561
559 562 def getprotocaps(self):
560 563 return self._protocaps
561 564
562 565 def getpayload(self):
563 566 # We initially send an empty response. This tells the client it is
564 567 # OK to start sending data. If a client sees any other response, it
565 568 # interprets it as an error.
566 569 _sshv1respondbytes(self._fout, b'')
567 570
568 571 # The file is in the form:
569 572 #
570 573 # <chunk size>\n<chunk>
571 574 # ...
572 575 # 0\n
573 576 count = int(self._fin.readline())
574 577 while count:
575 578 yield self._fin.read(count)
576 579 count = int(self._fin.readline())
577 580
578 581 @contextlib.contextmanager
579 582 def mayberedirectstdio(self):
580 583 yield None
581 584
582 585 def client(self):
583 586 client = encoding.environ.get(b'SSH_CLIENT', b'').split(b' ', 1)[0]
584 587 return b'remote:ssh:' + client
585 588
586 589 def addcapabilities(self, repo, caps):
587 590 if self.name == wireprototypes.SSHV1:
588 591 caps.append(b'protocaps')
589 592 caps.append(b'batch')
590 593 return caps
591 594
592 595 def checkperm(self, perm):
593 596 pass
594 597
595 598
596 599 class sshv2protocolhandler(sshv1protocolhandler):
597 600 """Protocol handler for version 2 of the SSH protocol."""
598 601
599 602 @property
600 603 def name(self):
601 604 return wireprototypes.SSHV2
602 605
603 606 def addcapabilities(self, repo, caps):
604 607 return caps
605 608
606 609
607 610 def _runsshserver(ui, repo, fin, fout, ev):
608 611 # This function operates like a state machine of sorts. The following
609 612 # states are defined:
610 613 #
611 614 # protov1-serving
612 615 # Server is in protocol version 1 serving mode. Commands arrive on
613 616 # new lines. These commands are processed in this state, one command
614 617 # after the other.
615 618 #
616 619 # protov2-serving
617 620 # Server is in protocol version 2 serving mode.
618 621 #
619 622 # upgrade-initial
620 623 # The server is going to process an upgrade request.
621 624 #
622 625 # upgrade-v2-filter-legacy-handshake
623 626 # The protocol is being upgraded to version 2. The server is expecting
624 627 # the legacy handshake from version 1.
625 628 #
626 629 # upgrade-v2-finish
627 630 # The upgrade to version 2 of the protocol is imminent.
628 631 #
629 632 # shutdown
630 633 # The server is shutting down, possibly in reaction to a client event.
631 634 #
632 635 # And here are their transitions:
633 636 #
634 637 # protov1-serving -> shutdown
635 638 # When server receives an empty request or encounters another
636 639 # error.
637 640 #
638 641 # protov1-serving -> upgrade-initial
639 642 # An upgrade request line was seen.
640 643 #
641 644 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
642 645 # Upgrade to version 2 in progress. Server is expecting to
643 646 # process a legacy handshake.
644 647 #
645 648 # upgrade-v2-filter-legacy-handshake -> shutdown
646 649 # Client did not fulfill upgrade handshake requirements.
647 650 #
648 651 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
649 652 # Client fulfilled version 2 upgrade requirements. Finishing that
650 653 # upgrade.
651 654 #
652 655 # upgrade-v2-finish -> protov2-serving
653 656 # Protocol upgrade to version 2 complete. Server can now speak protocol
654 657 # version 2.
655 658 #
656 659 # protov2-serving -> protov1-serving
657 660 # Ths happens by default since protocol version 2 is the same as
658 661 # version 1 except for the handshake.
659 662
660 663 state = b'protov1-serving'
661 664 proto = sshv1protocolhandler(ui, fin, fout)
662 665 protoswitched = False
663 666
664 667 while not ev.is_set():
665 668 if state == b'protov1-serving':
666 669 # Commands are issued on new lines.
667 670 request = fin.readline()[:-1]
668 671
669 672 # Empty lines signal to terminate the connection.
670 673 if not request:
671 674 state = b'shutdown'
672 675 continue
673 676
674 677 # It looks like a protocol upgrade request. Transition state to
675 678 # handle it.
676 679 if request.startswith(b'upgrade '):
677 680 if protoswitched:
678 681 _sshv1respondooberror(
679 682 fout,
680 683 ui.ferr,
681 684 b'cannot upgrade protocols multiple times',
682 685 )
683 686 state = b'shutdown'
684 687 continue
685 688
686 689 state = b'upgrade-initial'
687 690 continue
688 691
689 692 available = wireprotov1server.commands.commandavailable(
690 693 request, proto
691 694 )
692 695
693 696 # This command isn't available. Send an empty response and go
694 697 # back to waiting for a new command.
695 698 if not available:
696 699 _sshv1respondbytes(fout, b'')
697 700 continue
698 701
699 702 rsp = wireprotov1server.dispatch(repo, proto, request)
700 703 repo.ui.fout.flush()
701 704 repo.ui.ferr.flush()
702 705
703 706 if isinstance(rsp, bytes):
704 707 _sshv1respondbytes(fout, rsp)
705 708 elif isinstance(rsp, wireprototypes.bytesresponse):
706 709 _sshv1respondbytes(fout, rsp.data)
707 710 elif isinstance(rsp, wireprototypes.streamres):
708 711 _sshv1respondstream(fout, rsp)
709 712 elif isinstance(rsp, wireprototypes.streamreslegacy):
710 713 _sshv1respondstream(fout, rsp)
711 714 elif isinstance(rsp, wireprototypes.pushres):
712 715 _sshv1respondbytes(fout, b'')
713 716 _sshv1respondbytes(fout, b'%d' % rsp.res)
714 717 elif isinstance(rsp, wireprototypes.pusherr):
715 718 _sshv1respondbytes(fout, rsp.res)
716 719 elif isinstance(rsp, wireprototypes.ooberror):
717 720 _sshv1respondooberror(fout, ui.ferr, rsp.message)
718 721 else:
719 722 raise error.ProgrammingError(
720 723 b'unhandled response type from '
721 724 b'wire protocol command: %s' % rsp
722 725 )
723 726
724 727 # For now, protocol version 2 serving just goes back to version 1.
725 728 elif state == b'protov2-serving':
726 729 state = b'protov1-serving'
727 730 continue
728 731
729 732 elif state == b'upgrade-initial':
730 733 # We should never transition into this state if we've switched
731 734 # protocols.
732 735 assert not protoswitched
733 736 assert proto.name == wireprototypes.SSHV1
734 737
735 738 # Expected: upgrade <token> <capabilities>
736 739 # If we get something else, the request is malformed. It could be
737 740 # from a future client that has altered the upgrade line content.
738 741 # We treat this as an unknown command.
739 742 try:
740 743 token, caps = request.split(b' ')[1:]
741 744 except ValueError:
742 745 _sshv1respondbytes(fout, b'')
743 746 state = b'protov1-serving'
744 747 continue
745 748
746 749 # Send empty response if we don't support upgrading protocols.
747 750 if not ui.configbool(b'experimental', b'sshserver.support-v2'):
748 751 _sshv1respondbytes(fout, b'')
749 752 state = b'protov1-serving'
750 753 continue
751 754
752 755 try:
753 756 caps = urlreq.parseqs(caps)
754 757 except ValueError:
755 758 _sshv1respondbytes(fout, b'')
756 759 state = b'protov1-serving'
757 760 continue
758 761
759 762 # We don't see an upgrade request to protocol version 2. Ignore
760 763 # the upgrade request.
761 764 wantedprotos = caps.get(b'proto', [b''])[0]
762 765 if SSHV2 not in wantedprotos:
763 766 _sshv1respondbytes(fout, b'')
764 767 state = b'protov1-serving'
765 768 continue
766 769
767 770 # It looks like we can honor this upgrade request to protocol 2.
768 771 # Filter the rest of the handshake protocol request lines.
769 772 state = b'upgrade-v2-filter-legacy-handshake'
770 773 continue
771 774
772 775 elif state == b'upgrade-v2-filter-legacy-handshake':
773 776 # Client should have sent legacy handshake after an ``upgrade``
774 777 # request. Expected lines:
775 778 #
776 779 # hello
777 780 # between
778 781 # pairs 81
779 782 # 0000...-0000...
780 783
781 784 ok = True
782 785 for line in (b'hello', b'between', b'pairs 81'):
783 786 request = fin.readline()[:-1]
784 787
785 788 if request != line:
786 789 _sshv1respondooberror(
787 790 fout,
788 791 ui.ferr,
789 792 b'malformed handshake protocol: missing %s' % line,
790 793 )
791 794 ok = False
792 795 state = b'shutdown'
793 796 break
794 797
795 798 if not ok:
796 799 continue
797 800
798 801 request = fin.read(81)
799 802 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
800 803 _sshv1respondooberror(
801 804 fout,
802 805 ui.ferr,
803 806 b'malformed handshake protocol: '
804 807 b'missing between argument value',
805 808 )
806 809 state = b'shutdown'
807 810 continue
808 811
809 812 state = b'upgrade-v2-finish'
810 813 continue
811 814
812 815 elif state == b'upgrade-v2-finish':
813 816 # Send the upgrade response.
814 817 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
815 818 servercaps = wireprotov1server.capabilities(repo, proto)
816 819 rsp = b'capabilities: %s' % servercaps.data
817 820 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
818 821 fout.flush()
819 822
820 823 proto = sshv2protocolhandler(ui, fin, fout)
821 824 protoswitched = True
822 825
823 826 state = b'protov2-serving'
824 827 continue
825 828
826 829 elif state == b'shutdown':
827 830 break
828 831
829 832 else:
830 833 raise error.ProgrammingError(
831 834 b'unhandled ssh server state: %s' % state
832 835 )
833 836
834 837
835 838 class sshserver(object):
836 839 def __init__(self, ui, repo, logfh=None):
837 840 self._ui = ui
838 841 self._repo = repo
839 842 self._fin, self._fout = ui.protectfinout()
840 843
841 844 # Log write I/O to stdout and stderr if configured.
842 845 if logfh:
843 846 self._fout = util.makeloggingfileobject(
844 847 logfh, self._fout, b'o', logdata=True
845 848 )
846 849 ui.ferr = util.makeloggingfileobject(
847 850 logfh, ui.ferr, b'e', logdata=True
848 851 )
849 852
850 853 def serve_forever(self):
851 854 self.serveuntil(threading.Event())
852 855 self._ui.restorefinout(self._fin, self._fout)
853 856
854 857 def serveuntil(self, ev):
855 858 """Serve until a threading.Event is set."""
856 859 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,1613 +1,1615 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import collections
10 10 import contextlib
11 11
12 12 from .i18n import _
13 13 from .node import (
14 14 hex,
15 15 nullid,
16 16 )
17 17 from . import (
18 18 discovery,
19 19 encoding,
20 20 error,
21 21 match as matchmod,
22 22 narrowspec,
23 23 pycompat,
24 24 streamclone,
25 25 templatefilters,
26 26 util,
27 27 wireprotoframing,
28 28 wireprototypes,
29 29 )
30 30 from .interfaces import util as interfaceutil
31 31 from .utils import (
32 32 cborutil,
33 33 hashutil,
34 34 stringutil,
35 35 )
36 36
37 37 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
38 38
39 39 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
40 40
41 41 COMMANDS = wireprototypes.commanddict()
42 42
43 43 # Value inserted into cache key computation function. Change the value to
44 44 # force new cache keys for every command request. This should be done when
45 45 # there is a change to how caching works, etc.
46 46 GLOBAL_CACHE_VERSION = 1
47 47
48 48
49 49 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
50 50 from .hgweb import common as hgwebcommon
51 51
52 52 # URL space looks like: <permissions>/<command>, where <permission> can
53 53 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
54 54
55 55 # Root URL does nothing meaningful... yet.
56 56 if not urlparts:
57 57 res.status = b'200 OK'
58 58 res.headers[b'Content-Type'] = b'text/plain'
59 59 res.setbodybytes(_(b'HTTP version 2 API handler'))
60 60 return
61 61
62 62 if len(urlparts) == 1:
63 63 res.status = b'404 Not Found'
64 64 res.headers[b'Content-Type'] = b'text/plain'
65 65 res.setbodybytes(
66 66 _(b'do not know how to process %s\n') % req.dispatchpath
67 67 )
68 68 return
69 69
70 70 permission, command = urlparts[0:2]
71 71
72 72 if permission not in (b'ro', b'rw'):
73 73 res.status = b'404 Not Found'
74 74 res.headers[b'Content-Type'] = b'text/plain'
75 75 res.setbodybytes(_(b'unknown permission: %s') % permission)
76 76 return
77 77
78 78 if req.method != b'POST':
79 79 res.status = b'405 Method Not Allowed'
80 80 res.headers[b'Allow'] = b'POST'
81 81 res.setbodybytes(_(b'commands require POST requests'))
82 82 return
83 83
84 84 # At some point we'll want to use our own API instead of recycling the
85 85 # behavior of version 1 of the wire protocol...
86 86 # TODO return reasonable responses - not responses that overload the
87 87 # HTTP status line message for error reporting.
88 88 try:
89 89 checkperm(rctx, req, b'pull' if permission == b'ro' else b'push')
90 90 except hgwebcommon.ErrorResponse as e:
91 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
91 res.status = hgwebcommon.statusmessage(
92 e.code, stringutil.forcebytestr(e)
93 )
92 94 for k, v in e.headers:
93 95 res.headers[k] = v
94 96 res.setbodybytes(b'permission denied')
95 97 return
96 98
97 99 # We have a special endpoint to reflect the request back at the client.
98 100 if command == b'debugreflect':
99 101 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
100 102 return
101 103
102 104 # Extra commands that we handle that aren't really wire protocol
103 105 # commands. Think extra hard before making this hackery available to
104 106 # extension.
105 107 extracommands = {b'multirequest'}
106 108
107 109 if command not in COMMANDS and command not in extracommands:
108 110 res.status = b'404 Not Found'
109 111 res.headers[b'Content-Type'] = b'text/plain'
110 112 res.setbodybytes(_(b'unknown wire protocol command: %s\n') % command)
111 113 return
112 114
113 115 repo = rctx.repo
114 116 ui = repo.ui
115 117
116 118 proto = httpv2protocolhandler(req, ui)
117 119
118 120 if (
119 121 not COMMANDS.commandavailable(command, proto)
120 122 and command not in extracommands
121 123 ):
122 124 res.status = b'404 Not Found'
123 125 res.headers[b'Content-Type'] = b'text/plain'
124 126 res.setbodybytes(_(b'invalid wire protocol command: %s') % command)
125 127 return
126 128
127 129 # TODO consider cases where proxies may add additional Accept headers.
128 130 if req.headers.get(b'Accept') != FRAMINGTYPE:
129 131 res.status = b'406 Not Acceptable'
130 132 res.headers[b'Content-Type'] = b'text/plain'
131 133 res.setbodybytes(
132 134 _(b'client MUST specify Accept header with value: %s\n')
133 135 % FRAMINGTYPE
134 136 )
135 137 return
136 138
137 139 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
138 140 res.status = b'415 Unsupported Media Type'
139 141 # TODO we should send a response with appropriate media type,
140 142 # since client does Accept it.
141 143 res.headers[b'Content-Type'] = b'text/plain'
142 144 res.setbodybytes(
143 145 _(b'client MUST send Content-Type header with value: %s\n')
144 146 % FRAMINGTYPE
145 147 )
146 148 return
147 149
148 150 _processhttpv2request(ui, repo, req, res, permission, command, proto)
149 151
150 152
151 153 def _processhttpv2reflectrequest(ui, repo, req, res):
152 154 """Reads unified frame protocol request and dumps out state to client.
153 155
154 156 This special endpoint can be used to help debug the wire protocol.
155 157
156 158 Instead of routing the request through the normal dispatch mechanism,
157 159 we instead read all frames, decode them, and feed them into our state
158 160 tracker. We then dump the log of all that activity back out to the
159 161 client.
160 162 """
161 163 # Reflection APIs have a history of being abused, accidentally disclosing
162 164 # sensitive data, etc. So we have a config knob.
163 165 if not ui.configbool(b'experimental', b'web.api.debugreflect'):
164 166 res.status = b'404 Not Found'
165 167 res.headers[b'Content-Type'] = b'text/plain'
166 168 res.setbodybytes(_(b'debugreflect service not available'))
167 169 return
168 170
169 171 # We assume we have a unified framing protocol request body.
170 172
171 173 reactor = wireprotoframing.serverreactor(ui)
172 174 states = []
173 175
174 176 while True:
175 177 frame = wireprotoframing.readframe(req.bodyfh)
176 178
177 179 if not frame:
178 180 states.append(b'received: <no frame>')
179 181 break
180 182
181 183 states.append(
182 184 b'received: %d %d %d %s'
183 185 % (frame.typeid, frame.flags, frame.requestid, frame.payload)
184 186 )
185 187
186 188 action, meta = reactor.onframerecv(frame)
187 189 states.append(templatefilters.json((action, meta)))
188 190
189 191 action, meta = reactor.oninputeof()
190 192 meta[b'action'] = action
191 193 states.append(templatefilters.json(meta))
192 194
193 195 res.status = b'200 OK'
194 196 res.headers[b'Content-Type'] = b'text/plain'
195 197 res.setbodybytes(b'\n'.join(states))
196 198
197 199
198 200 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
199 201 """Post-validation handler for HTTPv2 requests.
200 202
201 203 Called when the HTTP request contains unified frame-based protocol
202 204 frames for evaluation.
203 205 """
204 206 # TODO Some HTTP clients are full duplex and can receive data before
205 207 # the entire request is transmitted. Figure out a way to indicate support
206 208 # for that so we can opt into full duplex mode.
207 209 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
208 210 seencommand = False
209 211
210 212 outstream = None
211 213
212 214 while True:
213 215 frame = wireprotoframing.readframe(req.bodyfh)
214 216 if not frame:
215 217 break
216 218
217 219 action, meta = reactor.onframerecv(frame)
218 220
219 221 if action == b'wantframe':
220 222 # Need more data before we can do anything.
221 223 continue
222 224 elif action == b'runcommand':
223 225 # Defer creating output stream because we need to wait for
224 226 # protocol settings frames so proper encoding can be applied.
225 227 if not outstream:
226 228 outstream = reactor.makeoutputstream()
227 229
228 230 sentoutput = _httpv2runcommand(
229 231 ui,
230 232 repo,
231 233 req,
232 234 res,
233 235 authedperm,
234 236 reqcommand,
235 237 reactor,
236 238 outstream,
237 239 meta,
238 240 issubsequent=seencommand,
239 241 )
240 242
241 243 if sentoutput:
242 244 return
243 245
244 246 seencommand = True
245 247
246 248 elif action == b'error':
247 249 # TODO define proper error mechanism.
248 250 res.status = b'200 OK'
249 251 res.headers[b'Content-Type'] = b'text/plain'
250 252 res.setbodybytes(meta[b'message'] + b'\n')
251 253 return
252 254 else:
253 255 raise error.ProgrammingError(
254 256 b'unhandled action from frame processor: %s' % action
255 257 )
256 258
257 259 action, meta = reactor.oninputeof()
258 260 if action == b'sendframes':
259 261 # We assume we haven't started sending the response yet. If we're
260 262 # wrong, the response type will raise an exception.
261 263 res.status = b'200 OK'
262 264 res.headers[b'Content-Type'] = FRAMINGTYPE
263 265 res.setbodygen(meta[b'framegen'])
264 266 elif action == b'noop':
265 267 pass
266 268 else:
267 269 raise error.ProgrammingError(
268 270 b'unhandled action from frame processor: %s' % action
269 271 )
270 272
271 273
272 274 def _httpv2runcommand(
273 275 ui,
274 276 repo,
275 277 req,
276 278 res,
277 279 authedperm,
278 280 reqcommand,
279 281 reactor,
280 282 outstream,
281 283 command,
282 284 issubsequent,
283 285 ):
284 286 """Dispatch a wire protocol command made from HTTPv2 requests.
285 287
286 288 The authenticated permission (``authedperm``) along with the original
287 289 command from the URL (``reqcommand``) are passed in.
288 290 """
289 291 # We already validated that the session has permissions to perform the
290 292 # actions in ``authedperm``. In the unified frame protocol, the canonical
291 293 # command to run is expressed in a frame. However, the URL also requested
292 294 # to run a specific command. We need to be careful that the command we
293 295 # run doesn't have permissions requirements greater than what was granted
294 296 # by ``authedperm``.
295 297 #
296 298 # Our rule for this is we only allow one command per HTTP request and
297 299 # that command must match the command in the URL. However, we make
298 300 # an exception for the ``multirequest`` URL. This URL is allowed to
299 301 # execute multiple commands. We double check permissions of each command
300 302 # as it is invoked to ensure there is no privilege escalation.
301 303 # TODO consider allowing multiple commands to regular command URLs
302 304 # iff each command is the same.
303 305
304 306 proto = httpv2protocolhandler(req, ui, args=command[b'args'])
305 307
306 308 if reqcommand == b'multirequest':
307 309 if not COMMANDS.commandavailable(command[b'command'], proto):
308 310 # TODO proper error mechanism
309 311 res.status = b'200 OK'
310 312 res.headers[b'Content-Type'] = b'text/plain'
311 313 res.setbodybytes(
312 314 _(b'wire protocol command not available: %s')
313 315 % command[b'command']
314 316 )
315 317 return True
316 318
317 319 # TODO don't use assert here, since it may be elided by -O.
318 320 assert authedperm in (b'ro', b'rw')
319 321 wirecommand = COMMANDS[command[b'command']]
320 322 assert wirecommand.permission in (b'push', b'pull')
321 323
322 324 if authedperm == b'ro' and wirecommand.permission != b'pull':
323 325 # TODO proper error mechanism
324 326 res.status = b'403 Forbidden'
325 327 res.headers[b'Content-Type'] = b'text/plain'
326 328 res.setbodybytes(
327 329 _(b'insufficient permissions to execute command: %s')
328 330 % command[b'command']
329 331 )
330 332 return True
331 333
332 334 # TODO should we also call checkperm() here? Maybe not if we're going
333 335 # to overhaul that API. The granted scope from the URL check should
334 336 # be good enough.
335 337
336 338 else:
337 339 # Don't allow multiple commands outside of ``multirequest`` URL.
338 340 if issubsequent:
339 341 # TODO proper error mechanism
340 342 res.status = b'200 OK'
341 343 res.headers[b'Content-Type'] = b'text/plain'
342 344 res.setbodybytes(
343 345 _(b'multiple commands cannot be issued to this URL')
344 346 )
345 347 return True
346 348
347 349 if reqcommand != command[b'command']:
348 350 # TODO define proper error mechanism
349 351 res.status = b'200 OK'
350 352 res.headers[b'Content-Type'] = b'text/plain'
351 353 res.setbodybytes(_(b'command in frame must match command in URL'))
352 354 return True
353 355
354 356 res.status = b'200 OK'
355 357 res.headers[b'Content-Type'] = FRAMINGTYPE
356 358
357 359 try:
358 360 objs = dispatch(repo, proto, command[b'command'], command[b'redirect'])
359 361
360 362 action, meta = reactor.oncommandresponsereadyobjects(
361 363 outstream, command[b'requestid'], objs
362 364 )
363 365
364 366 except error.WireprotoCommandError as e:
365 367 action, meta = reactor.oncommanderror(
366 368 outstream, command[b'requestid'], e.message, e.messageargs
367 369 )
368 370
369 371 except Exception as e:
370 372 action, meta = reactor.onservererror(
371 373 outstream,
372 374 command[b'requestid'],
373 375 _(b'exception when invoking command: %s')
374 376 % stringutil.forcebytestr(e),
375 377 )
376 378
377 379 if action == b'sendframes':
378 380 res.setbodygen(meta[b'framegen'])
379 381 return True
380 382 elif action == b'noop':
381 383 return False
382 384 else:
383 385 raise error.ProgrammingError(
384 386 b'unhandled event from reactor: %s' % action
385 387 )
386 388
387 389
388 390 def getdispatchrepo(repo, proto, command):
389 391 viewconfig = repo.ui.config(b'server', b'view')
390 392 return repo.filtered(viewconfig)
391 393
392 394
393 395 def dispatch(repo, proto, command, redirect):
394 396 """Run a wire protocol command.
395 397
396 398 Returns an iterable of objects that will be sent to the client.
397 399 """
398 400 repo = getdispatchrepo(repo, proto, command)
399 401
400 402 entry = COMMANDS[command]
401 403 func = entry.func
402 404 spec = entry.args
403 405
404 406 args = proto.getargs(spec)
405 407
406 408 # There is some duplicate boilerplate code here for calling the command and
407 409 # emitting objects. It is either that or a lot of indented code that looks
408 410 # like a pyramid (since there are a lot of code paths that result in not
409 411 # using the cacher).
410 412 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
411 413
412 414 # Request is not cacheable. Don't bother instantiating a cacher.
413 415 if not entry.cachekeyfn:
414 416 for o in callcommand():
415 417 yield o
416 418 return
417 419
418 420 if redirect:
419 421 redirecttargets = redirect[b'targets']
420 422 redirecthashes = redirect[b'hashes']
421 423 else:
422 424 redirecttargets = []
423 425 redirecthashes = []
424 426
425 427 cacher = makeresponsecacher(
426 428 repo,
427 429 proto,
428 430 command,
429 431 args,
430 432 cborutil.streamencode,
431 433 redirecttargets=redirecttargets,
432 434 redirecthashes=redirecthashes,
433 435 )
434 436
435 437 # But we have no cacher. Do default handling.
436 438 if not cacher:
437 439 for o in callcommand():
438 440 yield o
439 441 return
440 442
441 443 with cacher:
442 444 cachekey = entry.cachekeyfn(
443 445 repo, proto, cacher, **pycompat.strkwargs(args)
444 446 )
445 447
446 448 # No cache key or the cacher doesn't like it. Do default handling.
447 449 if cachekey is None or not cacher.setcachekey(cachekey):
448 450 for o in callcommand():
449 451 yield o
450 452 return
451 453
452 454 # Serve it from the cache, if possible.
453 455 cached = cacher.lookup()
454 456
455 457 if cached:
456 458 for o in cached[b'objs']:
457 459 yield o
458 460 return
459 461
460 462 # Else call the command and feed its output into the cacher, allowing
461 463 # the cacher to buffer/mutate objects as it desires.
462 464 for o in callcommand():
463 465 for o in cacher.onobject(o):
464 466 yield o
465 467
466 468 for o in cacher.onfinished():
467 469 yield o
468 470
469 471
470 472 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
471 473 class httpv2protocolhandler(object):
472 474 def __init__(self, req, ui, args=None):
473 475 self._req = req
474 476 self._ui = ui
475 477 self._args = args
476 478
477 479 @property
478 480 def name(self):
479 481 return HTTP_WIREPROTO_V2
480 482
481 483 def getargs(self, args):
482 484 # First look for args that were passed but aren't registered on this
483 485 # command.
484 486 extra = set(self._args) - set(args)
485 487 if extra:
486 488 raise error.WireprotoCommandError(
487 489 b'unsupported argument to command: %s'
488 490 % b', '.join(sorted(extra))
489 491 )
490 492
491 493 # And look for required arguments that are missing.
492 494 missing = {a for a in args if args[a][b'required']} - set(self._args)
493 495
494 496 if missing:
495 497 raise error.WireprotoCommandError(
496 498 b'missing required arguments: %s' % b', '.join(sorted(missing))
497 499 )
498 500
499 501 # Now derive the arguments to pass to the command, taking into
500 502 # account the arguments specified by the client.
501 503 data = {}
502 504 for k, meta in sorted(args.items()):
503 505 # This argument wasn't passed by the client.
504 506 if k not in self._args:
505 507 data[k] = meta[b'default']()
506 508 continue
507 509
508 510 v = self._args[k]
509 511
510 512 # Sets may be expressed as lists. Silently normalize.
511 513 if meta[b'type'] == b'set' and isinstance(v, list):
512 514 v = set(v)
513 515
514 516 # TODO consider more/stronger type validation.
515 517
516 518 data[k] = v
517 519
518 520 return data
519 521
520 522 def getprotocaps(self):
521 523 # Protocol capabilities are currently not implemented for HTTP V2.
522 524 return set()
523 525
524 526 def getpayload(self):
525 527 raise NotImplementedError
526 528
527 529 @contextlib.contextmanager
528 530 def mayberedirectstdio(self):
529 531 raise NotImplementedError
530 532
531 533 def client(self):
532 534 raise NotImplementedError
533 535
534 536 def addcapabilities(self, repo, caps):
535 537 return caps
536 538
537 539 def checkperm(self, perm):
538 540 raise NotImplementedError
539 541
540 542
541 543 def httpv2apidescriptor(req, repo):
542 544 proto = httpv2protocolhandler(req, repo.ui)
543 545
544 546 return _capabilitiesv2(repo, proto)
545 547
546 548
547 549 def _capabilitiesv2(repo, proto):
548 550 """Obtain the set of capabilities for version 2 transports.
549 551
550 552 These capabilities are distinct from the capabilities for version 1
551 553 transports.
552 554 """
553 555 caps = {
554 556 b'commands': {},
555 557 b'framingmediatypes': [FRAMINGTYPE],
556 558 b'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
557 559 }
558 560
559 561 for command, entry in COMMANDS.items():
560 562 args = {}
561 563
562 564 for arg, meta in entry.args.items():
563 565 args[arg] = {
564 566 # TODO should this be a normalized type using CBOR's
565 567 # terminology?
566 568 b'type': meta[b'type'],
567 569 b'required': meta[b'required'],
568 570 }
569 571
570 572 if not meta[b'required']:
571 573 args[arg][b'default'] = meta[b'default']()
572 574
573 575 if meta[b'validvalues']:
574 576 args[arg][b'validvalues'] = meta[b'validvalues']
575 577
576 578 # TODO this type of check should be defined in a per-command callback.
577 579 if (
578 580 command == b'rawstorefiledata'
579 581 and not streamclone.allowservergeneration(repo)
580 582 ):
581 583 continue
582 584
583 585 caps[b'commands'][command] = {
584 586 b'args': args,
585 587 b'permissions': [entry.permission],
586 588 }
587 589
588 590 if entry.extracapabilitiesfn:
589 591 extracaps = entry.extracapabilitiesfn(repo, proto)
590 592 caps[b'commands'][command].update(extracaps)
591 593
592 594 caps[b'rawrepoformats'] = sorted(repo.requirements & repo.supportedformats)
593 595
594 596 targets = getadvertisedredirecttargets(repo, proto)
595 597 if targets:
596 598 caps[b'redirect'] = {
597 599 b'targets': [],
598 600 b'hashes': [b'sha256', b'sha1'],
599 601 }
600 602
601 603 for target in targets:
602 604 entry = {
603 605 b'name': target[b'name'],
604 606 b'protocol': target[b'protocol'],
605 607 b'uris': target[b'uris'],
606 608 }
607 609
608 610 for key in (b'snirequired', b'tlsversions'):
609 611 if key in target:
610 612 entry[key] = target[key]
611 613
612 614 caps[b'redirect'][b'targets'].append(entry)
613 615
614 616 return proto.addcapabilities(repo, caps)
615 617
616 618
617 619 def getadvertisedredirecttargets(repo, proto):
618 620 """Obtain a list of content redirect targets.
619 621
620 622 Returns a list containing potential redirect targets that will be
621 623 advertised in capabilities data. Each dict MUST have the following
622 624 keys:
623 625
624 626 name
625 627 The name of this redirect target. This is the identifier clients use
626 628 to refer to a target. It is transferred as part of every command
627 629 request.
628 630
629 631 protocol
630 632 Network protocol used by this target. Typically this is the string
631 633 in front of the ``://`` in a URL. e.g. ``https``.
632 634
633 635 uris
634 636 List of representative URIs for this target. Clients can use the
635 637 URIs to test parsing for compatibility or for ordering preference
636 638 for which target to use.
637 639
638 640 The following optional keys are recognized:
639 641
640 642 snirequired
641 643 Bool indicating if Server Name Indication (SNI) is required to
642 644 connect to this target.
643 645
644 646 tlsversions
645 647 List of bytes indicating which TLS versions are supported by this
646 648 target.
647 649
648 650 By default, clients reflect the target order advertised by servers
649 651 and servers will use the first client-advertised target when picking
650 652 a redirect target. So targets should be advertised in the order the
651 653 server prefers they be used.
652 654 """
653 655 return []
654 656
655 657
656 658 def wireprotocommand(
657 659 name,
658 660 args=None,
659 661 permission=b'push',
660 662 cachekeyfn=None,
661 663 extracapabilitiesfn=None,
662 664 ):
663 665 """Decorator to declare a wire protocol command.
664 666
665 667 ``name`` is the name of the wire protocol command being provided.
666 668
667 669 ``args`` is a dict defining arguments accepted by the command. Keys are
668 670 the argument name. Values are dicts with the following keys:
669 671
670 672 ``type``
671 673 The argument data type. Must be one of the following string
672 674 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
673 675 or ``bool``.
674 676
675 677 ``default``
676 678 A callable returning the default value for this argument. If not
677 679 specified, ``None`` will be the default value.
678 680
679 681 ``example``
680 682 An example value for this argument.
681 683
682 684 ``validvalues``
683 685 Set of recognized values for this argument.
684 686
685 687 ``permission`` defines the permission type needed to run this command.
686 688 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
687 689 respectively. Default is to assume command requires ``push`` permissions
688 690 because otherwise commands not declaring their permissions could modify
689 691 a repository that is supposed to be read-only.
690 692
691 693 ``cachekeyfn`` defines an optional callable that can derive the
692 694 cache key for this request.
693 695
694 696 ``extracapabilitiesfn`` defines an optional callable that defines extra
695 697 command capabilities/parameters that are advertised next to the command
696 698 in the capabilities data structure describing the server. The callable
697 699 receives as arguments the repository and protocol objects. It returns
698 700 a dict of extra fields to add to the command descriptor.
699 701
700 702 Wire protocol commands are generators of objects to be serialized and
701 703 sent to the client.
702 704
703 705 If a command raises an uncaught exception, this will be translated into
704 706 a command error.
705 707
706 708 All commands can opt in to being cacheable by defining a function
707 709 (``cachekeyfn``) that is called to derive a cache key. This function
708 710 receives the same arguments as the command itself plus a ``cacher``
709 711 argument containing the active cacher for the request and returns a bytes
710 712 containing the key in a cache the response to this command may be cached
711 713 under.
712 714 """
713 715 transports = {
714 716 k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 2
715 717 }
716 718
717 719 if permission not in (b'push', b'pull'):
718 720 raise error.ProgrammingError(
719 721 b'invalid wire protocol permission; '
720 722 b'got %s; expected "push" or "pull"' % permission
721 723 )
722 724
723 725 if args is None:
724 726 args = {}
725 727
726 728 if not isinstance(args, dict):
727 729 raise error.ProgrammingError(
728 730 b'arguments for version 2 commands must be declared as dicts'
729 731 )
730 732
731 733 for arg, meta in args.items():
732 734 if arg == b'*':
733 735 raise error.ProgrammingError(
734 736 b'* argument name not allowed on version 2 commands'
735 737 )
736 738
737 739 if not isinstance(meta, dict):
738 740 raise error.ProgrammingError(
739 741 b'arguments for version 2 commands '
740 742 b'must declare metadata as a dict'
741 743 )
742 744
743 745 if b'type' not in meta:
744 746 raise error.ProgrammingError(
745 747 b'%s argument for command %s does not '
746 748 b'declare type field' % (arg, name)
747 749 )
748 750
749 751 if meta[b'type'] not in (
750 752 b'bytes',
751 753 b'int',
752 754 b'list',
753 755 b'dict',
754 756 b'set',
755 757 b'bool',
756 758 ):
757 759 raise error.ProgrammingError(
758 760 b'%s argument for command %s has '
759 761 b'illegal type: %s' % (arg, name, meta[b'type'])
760 762 )
761 763
762 764 if b'example' not in meta:
763 765 raise error.ProgrammingError(
764 766 b'%s argument for command %s does not '
765 767 b'declare example field' % (arg, name)
766 768 )
767 769
768 770 meta[b'required'] = b'default' not in meta
769 771
770 772 meta.setdefault(b'default', lambda: None)
771 773 meta.setdefault(b'validvalues', None)
772 774
773 775 def register(func):
774 776 if name in COMMANDS:
775 777 raise error.ProgrammingError(
776 778 b'%s command already registered for version 2' % name
777 779 )
778 780
779 781 COMMANDS[name] = wireprototypes.commandentry(
780 782 func,
781 783 args=args,
782 784 transports=transports,
783 785 permission=permission,
784 786 cachekeyfn=cachekeyfn,
785 787 extracapabilitiesfn=extracapabilitiesfn,
786 788 )
787 789
788 790 return func
789 791
790 792 return register
791 793
792 794
793 795 def makecommandcachekeyfn(command, localversion=None, allargs=False):
794 796 """Construct a cache key derivation function with common features.
795 797
796 798 By default, the cache key is a hash of:
797 799
798 800 * The command name.
799 801 * A global cache version number.
800 802 * A local cache version number (passed via ``localversion``).
801 803 * All the arguments passed to the command.
802 804 * The media type used.
803 805 * Wire protocol version string.
804 806 * The repository path.
805 807 """
806 808 if not allargs:
807 809 raise error.ProgrammingError(
808 810 b'only allargs=True is currently supported'
809 811 )
810 812
811 813 if localversion is None:
812 814 raise error.ProgrammingError(b'must set localversion argument value')
813 815
814 816 def cachekeyfn(repo, proto, cacher, **args):
815 817 spec = COMMANDS[command]
816 818
817 819 # Commands that mutate the repo can not be cached.
818 820 if spec.permission == b'push':
819 821 return None
820 822
821 823 # TODO config option to disable caching.
822 824
823 825 # Our key derivation strategy is to construct a data structure
824 826 # holding everything that could influence cacheability and to hash
825 827 # the CBOR representation of that. Using CBOR seems like it might
826 828 # be overkill. However, simpler hashing mechanisms are prone to
827 829 # duplicate input issues. e.g. if you just concatenate two values,
828 830 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
829 831 # "padding" between values and prevents these problems.
830 832
831 833 # Seed the hash with various data.
832 834 state = {
833 835 # To invalidate all cache keys.
834 836 b'globalversion': GLOBAL_CACHE_VERSION,
835 837 # More granular cache key invalidation.
836 838 b'localversion': localversion,
837 839 # Cache keys are segmented by command.
838 840 b'command': command,
839 841 # Throw in the media type and API version strings so changes
840 842 # to exchange semantics invalid cache.
841 843 b'mediatype': FRAMINGTYPE,
842 844 b'version': HTTP_WIREPROTO_V2,
843 845 # So same requests for different repos don't share cache keys.
844 846 b'repo': repo.root,
845 847 }
846 848
847 849 # The arguments passed to us will have already been normalized.
848 850 # Default values will be set, etc. This is important because it
849 851 # means that it doesn't matter if clients send an explicit argument
850 852 # or rely on the default value: it will all normalize to the same
851 853 # set of arguments on the server and therefore the same cache key.
852 854 #
853 855 # Arguments by their very nature must support being encoded to CBOR.
854 856 # And the CBOR encoder is deterministic. So we hash the arguments
855 857 # by feeding the CBOR of their representation into the hasher.
856 858 if allargs:
857 859 state[b'args'] = pycompat.byteskwargs(args)
858 860
859 861 cacher.adjustcachekeystate(state)
860 862
861 863 hasher = hashutil.sha1()
862 864 for chunk in cborutil.streamencode(state):
863 865 hasher.update(chunk)
864 866
865 867 return pycompat.sysbytes(hasher.hexdigest())
866 868
867 869 return cachekeyfn
868 870
869 871
870 872 def makeresponsecacher(
871 873 repo, proto, command, args, objencoderfn, redirecttargets, redirecthashes
872 874 ):
873 875 """Construct a cacher for a cacheable command.
874 876
875 877 Returns an ``iwireprotocolcommandcacher`` instance.
876 878
877 879 Extensions can monkeypatch this function to provide custom caching
878 880 backends.
879 881 """
880 882 return None
881 883
882 884
883 885 def resolvenodes(repo, revisions):
884 886 """Resolve nodes from a revisions specifier data structure."""
885 887 cl = repo.changelog
886 888 clhasnode = cl.hasnode
887 889
888 890 seen = set()
889 891 nodes = []
890 892
891 893 if not isinstance(revisions, list):
892 894 raise error.WireprotoCommandError(
893 895 b'revisions must be defined as an array'
894 896 )
895 897
896 898 for spec in revisions:
897 899 if b'type' not in spec:
898 900 raise error.WireprotoCommandError(
899 901 b'type key not present in revision specifier'
900 902 )
901 903
902 904 typ = spec[b'type']
903 905
904 906 if typ == b'changesetexplicit':
905 907 if b'nodes' not in spec:
906 908 raise error.WireprotoCommandError(
907 909 b'nodes key not present in changesetexplicit revision '
908 910 b'specifier'
909 911 )
910 912
911 913 for node in spec[b'nodes']:
912 914 if node not in seen:
913 915 nodes.append(node)
914 916 seen.add(node)
915 917
916 918 elif typ == b'changesetexplicitdepth':
917 919 for key in (b'nodes', b'depth'):
918 920 if key not in spec:
919 921 raise error.WireprotoCommandError(
920 922 b'%s key not present in changesetexplicitdepth revision '
921 923 b'specifier',
922 924 (key,),
923 925 )
924 926
925 927 for rev in repo.revs(
926 928 b'ancestors(%ln, %s)', spec[b'nodes'], spec[b'depth'] - 1
927 929 ):
928 930 node = cl.node(rev)
929 931
930 932 if node not in seen:
931 933 nodes.append(node)
932 934 seen.add(node)
933 935
934 936 elif typ == b'changesetdagrange':
935 937 for key in (b'roots', b'heads'):
936 938 if key not in spec:
937 939 raise error.WireprotoCommandError(
938 940 b'%s key not present in changesetdagrange revision '
939 941 b'specifier',
940 942 (key,),
941 943 )
942 944
943 945 if not spec[b'heads']:
944 946 raise error.WireprotoCommandError(
945 947 b'heads key in changesetdagrange cannot be empty'
946 948 )
947 949
948 950 if spec[b'roots']:
949 951 common = [n for n in spec[b'roots'] if clhasnode(n)]
950 952 else:
951 953 common = [nullid]
952 954
953 955 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
954 956 if n not in seen:
955 957 nodes.append(n)
956 958 seen.add(n)
957 959
958 960 else:
959 961 raise error.WireprotoCommandError(
960 962 b'unknown revision specifier type: %s', (typ,)
961 963 )
962 964
963 965 return nodes
964 966
965 967
966 968 @wireprotocommand(b'branchmap', permission=b'pull')
967 969 def branchmapv2(repo, proto):
968 970 yield {
969 971 encoding.fromlocal(k): v
970 972 for k, v in pycompat.iteritems(repo.branchmap())
971 973 }
972 974
973 975
974 976 @wireprotocommand(b'capabilities', permission=b'pull')
975 977 def capabilitiesv2(repo, proto):
976 978 yield _capabilitiesv2(repo, proto)
977 979
978 980
979 981 @wireprotocommand(
980 982 b'changesetdata',
981 983 args={
982 984 b'revisions': {
983 985 b'type': b'list',
984 986 b'example': [
985 987 {
986 988 b'type': b'changesetexplicit',
987 989 b'nodes': [b'abcdef...'],
988 990 }
989 991 ],
990 992 },
991 993 b'fields': {
992 994 b'type': b'set',
993 995 b'default': set,
994 996 b'example': {b'parents', b'revision'},
995 997 b'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
996 998 },
997 999 },
998 1000 permission=b'pull',
999 1001 )
1000 1002 def changesetdata(repo, proto, revisions, fields):
1001 1003 # TODO look for unknown fields and abort when they can't be serviced.
1002 1004 # This could probably be validated by dispatcher using validvalues.
1003 1005
1004 1006 cl = repo.changelog
1005 1007 outgoing = resolvenodes(repo, revisions)
1006 1008 publishing = repo.publishing()
1007 1009
1008 1010 if outgoing:
1009 1011 repo.hook(b'preoutgoing', throw=True, source=b'serve')
1010 1012
1011 1013 yield {
1012 1014 b'totalitems': len(outgoing),
1013 1015 }
1014 1016
1015 1017 # The phases of nodes already transferred to the client may have changed
1016 1018 # since the client last requested data. We send phase-only records
1017 1019 # for these revisions, if requested.
1018 1020 # TODO actually do this. We'll probably want to emit phase heads
1019 1021 # in the ancestry set of the outgoing revisions. This will ensure
1020 1022 # that phase updates within that set are seen.
1021 1023 if b'phase' in fields:
1022 1024 pass
1023 1025
1024 1026 nodebookmarks = {}
1025 1027 for mark, node in repo._bookmarks.items():
1026 1028 nodebookmarks.setdefault(node, set()).add(mark)
1027 1029
1028 1030 # It is already topologically sorted by revision number.
1029 1031 for node in outgoing:
1030 1032 d = {
1031 1033 b'node': node,
1032 1034 }
1033 1035
1034 1036 if b'parents' in fields:
1035 1037 d[b'parents'] = cl.parents(node)
1036 1038
1037 1039 if b'phase' in fields:
1038 1040 if publishing:
1039 1041 d[b'phase'] = b'public'
1040 1042 else:
1041 1043 ctx = repo[node]
1042 1044 d[b'phase'] = ctx.phasestr()
1043 1045
1044 1046 if b'bookmarks' in fields and node in nodebookmarks:
1045 1047 d[b'bookmarks'] = sorted(nodebookmarks[node])
1046 1048 del nodebookmarks[node]
1047 1049
1048 1050 followingmeta = []
1049 1051 followingdata = []
1050 1052
1051 1053 if b'revision' in fields:
1052 1054 revisiondata = cl.revision(node)
1053 1055 followingmeta.append((b'revision', len(revisiondata)))
1054 1056 followingdata.append(revisiondata)
1055 1057
1056 1058 # TODO make it possible for extensions to wrap a function or register
1057 1059 # a handler to service custom fields.
1058 1060
1059 1061 if followingmeta:
1060 1062 d[b'fieldsfollowing'] = followingmeta
1061 1063
1062 1064 yield d
1063 1065
1064 1066 for extra in followingdata:
1065 1067 yield extra
1066 1068
1067 1069 # If requested, send bookmarks from nodes that didn't have revision
1068 1070 # data sent so receiver is aware of any bookmark updates.
1069 1071 if b'bookmarks' in fields:
1070 1072 for node, marks in sorted(pycompat.iteritems(nodebookmarks)):
1071 1073 yield {
1072 1074 b'node': node,
1073 1075 b'bookmarks': sorted(marks),
1074 1076 }
1075 1077
1076 1078
1077 1079 class FileAccessError(Exception):
1078 1080 """Represents an error accessing a specific file."""
1079 1081
1080 1082 def __init__(self, path, msg, args):
1081 1083 self.path = path
1082 1084 self.msg = msg
1083 1085 self.args = args
1084 1086
1085 1087
1086 1088 def getfilestore(repo, proto, path):
1087 1089 """Obtain a file storage object for use with wire protocol.
1088 1090
1089 1091 Exists as a standalone function so extensions can monkeypatch to add
1090 1092 access control.
1091 1093 """
1092 1094 # This seems to work even if the file doesn't exist. So catch
1093 1095 # "empty" files and return an error.
1094 1096 fl = repo.file(path)
1095 1097
1096 1098 if not len(fl):
1097 1099 raise FileAccessError(path, b'unknown file: %s', (path,))
1098 1100
1099 1101 return fl
1100 1102
1101 1103
1102 1104 def emitfilerevisions(repo, path, revisions, linknodes, fields):
1103 1105 for revision in revisions:
1104 1106 d = {
1105 1107 b'node': revision.node,
1106 1108 }
1107 1109
1108 1110 if b'parents' in fields:
1109 1111 d[b'parents'] = [revision.p1node, revision.p2node]
1110 1112
1111 1113 if b'linknode' in fields:
1112 1114 d[b'linknode'] = linknodes[revision.node]
1113 1115
1114 1116 followingmeta = []
1115 1117 followingdata = []
1116 1118
1117 1119 if b'revision' in fields:
1118 1120 if revision.revision is not None:
1119 1121 followingmeta.append((b'revision', len(revision.revision)))
1120 1122 followingdata.append(revision.revision)
1121 1123 else:
1122 1124 d[b'deltabasenode'] = revision.basenode
1123 1125 followingmeta.append((b'delta', len(revision.delta)))
1124 1126 followingdata.append(revision.delta)
1125 1127
1126 1128 if followingmeta:
1127 1129 d[b'fieldsfollowing'] = followingmeta
1128 1130
1129 1131 yield d
1130 1132
1131 1133 for extra in followingdata:
1132 1134 yield extra
1133 1135
1134 1136
1135 1137 def makefilematcher(repo, pathfilter):
1136 1138 """Construct a matcher from a path filter dict."""
1137 1139
1138 1140 # Validate values.
1139 1141 if pathfilter:
1140 1142 for key in (b'include', b'exclude'):
1141 1143 for pattern in pathfilter.get(key, []):
1142 1144 if not pattern.startswith((b'path:', b'rootfilesin:')):
1143 1145 raise error.WireprotoCommandError(
1144 1146 b'%s pattern must begin with `path:` or `rootfilesin:`; '
1145 1147 b'got %s',
1146 1148 (key, pattern),
1147 1149 )
1148 1150
1149 1151 if pathfilter:
1150 1152 matcher = matchmod.match(
1151 1153 repo.root,
1152 1154 b'',
1153 1155 include=pathfilter.get(b'include', []),
1154 1156 exclude=pathfilter.get(b'exclude', []),
1155 1157 )
1156 1158 else:
1157 1159 matcher = matchmod.match(repo.root, b'')
1158 1160
1159 1161 # Requested patterns could include files not in the local store. So
1160 1162 # filter those out.
1161 1163 return repo.narrowmatch(matcher)
1162 1164
1163 1165
1164 1166 @wireprotocommand(
1165 1167 b'filedata',
1166 1168 args={
1167 1169 b'haveparents': {
1168 1170 b'type': b'bool',
1169 1171 b'default': lambda: False,
1170 1172 b'example': True,
1171 1173 },
1172 1174 b'nodes': {
1173 1175 b'type': b'list',
1174 1176 b'example': [b'0123456...'],
1175 1177 },
1176 1178 b'fields': {
1177 1179 b'type': b'set',
1178 1180 b'default': set,
1179 1181 b'example': {b'parents', b'revision'},
1180 1182 b'validvalues': {b'parents', b'revision', b'linknode'},
1181 1183 },
1182 1184 b'path': {
1183 1185 b'type': b'bytes',
1184 1186 b'example': b'foo.txt',
1185 1187 },
1186 1188 },
1187 1189 permission=b'pull',
1188 1190 # TODO censoring a file revision won't invalidate the cache.
1189 1191 # Figure out a way to take censoring into account when deriving
1190 1192 # the cache key.
1191 1193 cachekeyfn=makecommandcachekeyfn(b'filedata', 1, allargs=True),
1192 1194 )
1193 1195 def filedata(repo, proto, haveparents, nodes, fields, path):
1194 1196 # TODO this API allows access to file revisions that are attached to
1195 1197 # secret changesets. filesdata does not have this problem. Maybe this
1196 1198 # API should be deleted?
1197 1199
1198 1200 try:
1199 1201 # Extensions may wish to access the protocol handler.
1200 1202 store = getfilestore(repo, proto, path)
1201 1203 except FileAccessError as e:
1202 1204 raise error.WireprotoCommandError(e.msg, e.args)
1203 1205
1204 1206 clnode = repo.changelog.node
1205 1207 linknodes = {}
1206 1208
1207 1209 # Validate requested nodes.
1208 1210 for node in nodes:
1209 1211 try:
1210 1212 store.rev(node)
1211 1213 except error.LookupError:
1212 1214 raise error.WireprotoCommandError(
1213 1215 b'unknown file node: %s', (hex(node),)
1214 1216 )
1215 1217
1216 1218 # TODO by creating the filectx against a specific file revision
1217 1219 # instead of changeset, linkrev() is always used. This is wrong for
1218 1220 # cases where linkrev() may refer to a hidden changeset. But since this
1219 1221 # API doesn't know anything about changesets, we're not sure how to
1220 1222 # disambiguate the linknode. Perhaps we should delete this API?
1221 1223 fctx = repo.filectx(path, fileid=node)
1222 1224 linknodes[node] = clnode(fctx.introrev())
1223 1225
1224 1226 revisions = store.emitrevisions(
1225 1227 nodes,
1226 1228 revisiondata=b'revision' in fields,
1227 1229 assumehaveparentrevisions=haveparents,
1228 1230 )
1229 1231
1230 1232 yield {
1231 1233 b'totalitems': len(nodes),
1232 1234 }
1233 1235
1234 1236 for o in emitfilerevisions(repo, path, revisions, linknodes, fields):
1235 1237 yield o
1236 1238
1237 1239
1238 1240 def filesdatacapabilities(repo, proto):
1239 1241 batchsize = repo.ui.configint(
1240 1242 b'experimental', b'server.filesdata.recommended-batch-size'
1241 1243 )
1242 1244 return {
1243 1245 b'recommendedbatchsize': batchsize,
1244 1246 }
1245 1247
1246 1248
1247 1249 @wireprotocommand(
1248 1250 b'filesdata',
1249 1251 args={
1250 1252 b'haveparents': {
1251 1253 b'type': b'bool',
1252 1254 b'default': lambda: False,
1253 1255 b'example': True,
1254 1256 },
1255 1257 b'fields': {
1256 1258 b'type': b'set',
1257 1259 b'default': set,
1258 1260 b'example': {b'parents', b'revision'},
1259 1261 b'validvalues': {
1260 1262 b'firstchangeset',
1261 1263 b'linknode',
1262 1264 b'parents',
1263 1265 b'revision',
1264 1266 },
1265 1267 },
1266 1268 b'pathfilter': {
1267 1269 b'type': b'dict',
1268 1270 b'default': lambda: None,
1269 1271 b'example': {b'include': [b'path:tests']},
1270 1272 },
1271 1273 b'revisions': {
1272 1274 b'type': b'list',
1273 1275 b'example': [
1274 1276 {
1275 1277 b'type': b'changesetexplicit',
1276 1278 b'nodes': [b'abcdef...'],
1277 1279 }
1278 1280 ],
1279 1281 },
1280 1282 },
1281 1283 permission=b'pull',
1282 1284 # TODO censoring a file revision won't invalidate the cache.
1283 1285 # Figure out a way to take censoring into account when deriving
1284 1286 # the cache key.
1285 1287 cachekeyfn=makecommandcachekeyfn(b'filesdata', 1, allargs=True),
1286 1288 extracapabilitiesfn=filesdatacapabilities,
1287 1289 )
1288 1290 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1289 1291 # TODO This should operate on a repo that exposes obsolete changesets. There
1290 1292 # is a race between a client making a push that obsoletes a changeset and
1291 1293 # another client fetching files data for that changeset. If a client has a
1292 1294 # changeset, it should probably be allowed to access files data for that
1293 1295 # changeset.
1294 1296
1295 1297 outgoing = resolvenodes(repo, revisions)
1296 1298 filematcher = makefilematcher(repo, pathfilter)
1297 1299
1298 1300 # path -> {fnode: linknode}
1299 1301 fnodes = collections.defaultdict(dict)
1300 1302
1301 1303 # We collect the set of relevant file revisions by iterating the changeset
1302 1304 # revisions and either walking the set of files recorded in the changeset
1303 1305 # or by walking the manifest at that revision. There is probably room for a
1304 1306 # storage-level API to request this data, as it can be expensive to compute
1305 1307 # and would benefit from caching or alternate storage from what revlogs
1306 1308 # provide.
1307 1309 for node in outgoing:
1308 1310 ctx = repo[node]
1309 1311 mctx = ctx.manifestctx()
1310 1312 md = mctx.read()
1311 1313
1312 1314 if haveparents:
1313 1315 checkpaths = ctx.files()
1314 1316 else:
1315 1317 checkpaths = md.keys()
1316 1318
1317 1319 for path in checkpaths:
1318 1320 fnode = md[path]
1319 1321
1320 1322 if path in fnodes and fnode in fnodes[path]:
1321 1323 continue
1322 1324
1323 1325 if not filematcher(path):
1324 1326 continue
1325 1327
1326 1328 fnodes[path].setdefault(fnode, node)
1327 1329
1328 1330 yield {
1329 1331 b'totalpaths': len(fnodes),
1330 1332 b'totalitems': sum(len(v) for v in fnodes.values()),
1331 1333 }
1332 1334
1333 1335 for path, filenodes in sorted(fnodes.items()):
1334 1336 try:
1335 1337 store = getfilestore(repo, proto, path)
1336 1338 except FileAccessError as e:
1337 1339 raise error.WireprotoCommandError(e.msg, e.args)
1338 1340
1339 1341 yield {
1340 1342 b'path': path,
1341 1343 b'totalitems': len(filenodes),
1342 1344 }
1343 1345
1344 1346 revisions = store.emitrevisions(
1345 1347 filenodes.keys(),
1346 1348 revisiondata=b'revision' in fields,
1347 1349 assumehaveparentrevisions=haveparents,
1348 1350 )
1349 1351
1350 1352 for o in emitfilerevisions(repo, path, revisions, filenodes, fields):
1351 1353 yield o
1352 1354
1353 1355
1354 1356 @wireprotocommand(
1355 1357 b'heads',
1356 1358 args={
1357 1359 b'publiconly': {
1358 1360 b'type': b'bool',
1359 1361 b'default': lambda: False,
1360 1362 b'example': False,
1361 1363 },
1362 1364 },
1363 1365 permission=b'pull',
1364 1366 )
1365 1367 def headsv2(repo, proto, publiconly):
1366 1368 if publiconly:
1367 1369 repo = repo.filtered(b'immutable')
1368 1370
1369 1371 yield repo.heads()
1370 1372
1371 1373
1372 1374 @wireprotocommand(
1373 1375 b'known',
1374 1376 args={
1375 1377 b'nodes': {
1376 1378 b'type': b'list',
1377 1379 b'default': list,
1378 1380 b'example': [b'deadbeef'],
1379 1381 },
1380 1382 },
1381 1383 permission=b'pull',
1382 1384 )
1383 1385 def knownv2(repo, proto, nodes):
1384 1386 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1385 1387 yield result
1386 1388
1387 1389
1388 1390 @wireprotocommand(
1389 1391 b'listkeys',
1390 1392 args={
1391 1393 b'namespace': {
1392 1394 b'type': b'bytes',
1393 1395 b'example': b'ns',
1394 1396 },
1395 1397 },
1396 1398 permission=b'pull',
1397 1399 )
1398 1400 def listkeysv2(repo, proto, namespace):
1399 1401 keys = repo.listkeys(encoding.tolocal(namespace))
1400 1402 keys = {
1401 1403 encoding.fromlocal(k): encoding.fromlocal(v)
1402 1404 for k, v in pycompat.iteritems(keys)
1403 1405 }
1404 1406
1405 1407 yield keys
1406 1408
1407 1409
1408 1410 @wireprotocommand(
1409 1411 b'lookup',
1410 1412 args={
1411 1413 b'key': {
1412 1414 b'type': b'bytes',
1413 1415 b'example': b'foo',
1414 1416 },
1415 1417 },
1416 1418 permission=b'pull',
1417 1419 )
1418 1420 def lookupv2(repo, proto, key):
1419 1421 key = encoding.tolocal(key)
1420 1422
1421 1423 # TODO handle exception.
1422 1424 node = repo.lookup(key)
1423 1425
1424 1426 yield node
1425 1427
1426 1428
1427 1429 def manifestdatacapabilities(repo, proto):
1428 1430 batchsize = repo.ui.configint(
1429 1431 b'experimental', b'server.manifestdata.recommended-batch-size'
1430 1432 )
1431 1433
1432 1434 return {
1433 1435 b'recommendedbatchsize': batchsize,
1434 1436 }
1435 1437
1436 1438
1437 1439 @wireprotocommand(
1438 1440 b'manifestdata',
1439 1441 args={
1440 1442 b'nodes': {
1441 1443 b'type': b'list',
1442 1444 b'example': [b'0123456...'],
1443 1445 },
1444 1446 b'haveparents': {
1445 1447 b'type': b'bool',
1446 1448 b'default': lambda: False,
1447 1449 b'example': True,
1448 1450 },
1449 1451 b'fields': {
1450 1452 b'type': b'set',
1451 1453 b'default': set,
1452 1454 b'example': {b'parents', b'revision'},
1453 1455 b'validvalues': {b'parents', b'revision'},
1454 1456 },
1455 1457 b'tree': {
1456 1458 b'type': b'bytes',
1457 1459 b'example': b'',
1458 1460 },
1459 1461 },
1460 1462 permission=b'pull',
1461 1463 cachekeyfn=makecommandcachekeyfn(b'manifestdata', 1, allargs=True),
1462 1464 extracapabilitiesfn=manifestdatacapabilities,
1463 1465 )
1464 1466 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1465 1467 store = repo.manifestlog.getstorage(tree)
1466 1468
1467 1469 # Validate the node is known and abort on unknown revisions.
1468 1470 for node in nodes:
1469 1471 try:
1470 1472 store.rev(node)
1471 1473 except error.LookupError:
1472 1474 raise error.WireprotoCommandError(b'unknown node: %s', (node,))
1473 1475
1474 1476 revisions = store.emitrevisions(
1475 1477 nodes,
1476 1478 revisiondata=b'revision' in fields,
1477 1479 assumehaveparentrevisions=haveparents,
1478 1480 )
1479 1481
1480 1482 yield {
1481 1483 b'totalitems': len(nodes),
1482 1484 }
1483 1485
1484 1486 for revision in revisions:
1485 1487 d = {
1486 1488 b'node': revision.node,
1487 1489 }
1488 1490
1489 1491 if b'parents' in fields:
1490 1492 d[b'parents'] = [revision.p1node, revision.p2node]
1491 1493
1492 1494 followingmeta = []
1493 1495 followingdata = []
1494 1496
1495 1497 if b'revision' in fields:
1496 1498 if revision.revision is not None:
1497 1499 followingmeta.append((b'revision', len(revision.revision)))
1498 1500 followingdata.append(revision.revision)
1499 1501 else:
1500 1502 d[b'deltabasenode'] = revision.basenode
1501 1503 followingmeta.append((b'delta', len(revision.delta)))
1502 1504 followingdata.append(revision.delta)
1503 1505
1504 1506 if followingmeta:
1505 1507 d[b'fieldsfollowing'] = followingmeta
1506 1508
1507 1509 yield d
1508 1510
1509 1511 for extra in followingdata:
1510 1512 yield extra
1511 1513
1512 1514
1513 1515 @wireprotocommand(
1514 1516 b'pushkey',
1515 1517 args={
1516 1518 b'namespace': {
1517 1519 b'type': b'bytes',
1518 1520 b'example': b'ns',
1519 1521 },
1520 1522 b'key': {
1521 1523 b'type': b'bytes',
1522 1524 b'example': b'key',
1523 1525 },
1524 1526 b'old': {
1525 1527 b'type': b'bytes',
1526 1528 b'example': b'old',
1527 1529 },
1528 1530 b'new': {
1529 1531 b'type': b'bytes',
1530 1532 b'example': b'new',
1531 1533 },
1532 1534 },
1533 1535 permission=b'push',
1534 1536 )
1535 1537 def pushkeyv2(repo, proto, namespace, key, old, new):
1536 1538 # TODO handle ui output redirection
1537 1539 yield repo.pushkey(
1538 1540 encoding.tolocal(namespace),
1539 1541 encoding.tolocal(key),
1540 1542 encoding.tolocal(old),
1541 1543 encoding.tolocal(new),
1542 1544 )
1543 1545
1544 1546
1545 1547 @wireprotocommand(
1546 1548 b'rawstorefiledata',
1547 1549 args={
1548 1550 b'files': {
1549 1551 b'type': b'list',
1550 1552 b'example': [b'changelog', b'manifestlog'],
1551 1553 },
1552 1554 b'pathfilter': {
1553 1555 b'type': b'list',
1554 1556 b'default': lambda: None,
1555 1557 b'example': {b'include': [b'path:tests']},
1556 1558 },
1557 1559 },
1558 1560 permission=b'pull',
1559 1561 )
1560 1562 def rawstorefiledata(repo, proto, files, pathfilter):
1561 1563 if not streamclone.allowservergeneration(repo):
1562 1564 raise error.WireprotoCommandError(b'stream clone is disabled')
1563 1565
1564 1566 # TODO support dynamically advertising what store files "sets" are
1565 1567 # available. For now, we support changelog, manifestlog, and files.
1566 1568 files = set(files)
1567 1569 allowedfiles = {b'changelog', b'manifestlog'}
1568 1570
1569 1571 unsupported = files - allowedfiles
1570 1572 if unsupported:
1571 1573 raise error.WireprotoCommandError(
1572 1574 b'unknown file type: %s', (b', '.join(sorted(unsupported)),)
1573 1575 )
1574 1576
1575 1577 with repo.lock():
1576 1578 topfiles = list(repo.store.topfiles())
1577 1579
1578 1580 sendfiles = []
1579 1581 totalsize = 0
1580 1582
1581 1583 # TODO this is a bunch of storage layer interface abstractions because
1582 1584 # it assumes revlogs.
1583 1585 for name, encodedname, size in topfiles:
1584 1586 if b'changelog' in files and name.startswith(b'00changelog'):
1585 1587 pass
1586 1588 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1587 1589 pass
1588 1590 else:
1589 1591 continue
1590 1592
1591 1593 sendfiles.append((b'store', name, size))
1592 1594 totalsize += size
1593 1595
1594 1596 yield {
1595 1597 b'filecount': len(sendfiles),
1596 1598 b'totalsize': totalsize,
1597 1599 }
1598 1600
1599 1601 for location, name, size in sendfiles:
1600 1602 yield {
1601 1603 b'location': location,
1602 1604 b'path': name,
1603 1605 b'size': size,
1604 1606 }
1605 1607
1606 1608 # We have to use a closure for this to ensure the context manager is
1607 1609 # closed only after sending the final chunk.
1608 1610 def getfiledata():
1609 1611 with repo.svfs(name, b'rb', auditpath=False) as fh:
1610 1612 for chunk in util.filechunkiter(fh, limit=size):
1611 1613 yield chunk
1612 1614
1613 1615 yield wireprototypes.indefinitebytestringresponse(getfiledata())
General Comments 0
You need to be logged in to leave comments. Login now