##// END OF EJS Templates
wireprotov2: add support for more response types...
Gregory Szorc -
r37746:564a3eec default
parent child Browse files
Show More
@@ -1,1078 +1,1167
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 cbor,
21 21 )
22 22 from . import (
23 23 encoding,
24 24 error,
25 25 util,
26 26 )
27 27 from .utils import (
28 28 stringutil,
29 29 )
30 30
31 31 FRAME_HEADER_SIZE = 8
32 32 DEFAULT_MAX_FRAME_SIZE = 32768
33 33
34 34 STREAM_FLAG_BEGIN_STREAM = 0x01
35 35 STREAM_FLAG_END_STREAM = 0x02
36 36 STREAM_FLAG_ENCODING_APPLIED = 0x04
37 37
38 38 STREAM_FLAGS = {
39 39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
40 40 b'stream-end': STREAM_FLAG_END_STREAM,
41 41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
42 42 }
43 43
44 44 FRAME_TYPE_COMMAND_REQUEST = 0x01
45 45 FRAME_TYPE_COMMAND_DATA = 0x02
46 46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
47 47 FRAME_TYPE_ERROR_RESPONSE = 0x05
48 48 FRAME_TYPE_TEXT_OUTPUT = 0x06
49 49 FRAME_TYPE_PROGRESS = 0x07
50 50 FRAME_TYPE_STREAM_SETTINGS = 0x08
51 51
52 52 FRAME_TYPES = {
53 53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
54 54 b'command-data': FRAME_TYPE_COMMAND_DATA,
55 55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
56 56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
57 57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
58 58 b'progress': FRAME_TYPE_PROGRESS,
59 59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
60 60 }
61 61
62 62 FLAG_COMMAND_REQUEST_NEW = 0x01
63 63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
64 64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
65 65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
66 66
67 67 FLAGS_COMMAND_REQUEST = {
68 68 b'new': FLAG_COMMAND_REQUEST_NEW,
69 69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
70 70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
71 71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
72 72 }
73 73
74 74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
75 75 FLAG_COMMAND_DATA_EOS = 0x02
76 76
77 77 FLAGS_COMMAND_DATA = {
78 78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
79 79 b'eos': FLAG_COMMAND_DATA_EOS,
80 80 }
81 81
82 82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
83 83 FLAG_COMMAND_RESPONSE_EOS = 0x02
84 84
85 85 FLAGS_COMMAND_RESPONSE = {
86 86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
87 87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
88 88 }
89 89
90 90 # Maps frame types to their available flags.
91 91 FRAME_TYPE_FLAGS = {
92 92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
93 93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
94 94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
95 95 FRAME_TYPE_ERROR_RESPONSE: {},
96 96 FRAME_TYPE_TEXT_OUTPUT: {},
97 97 FRAME_TYPE_PROGRESS: {},
98 98 FRAME_TYPE_STREAM_SETTINGS: {},
99 99 }
100 100
101 101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
102 102
103 103 def humanflags(mapping, value):
104 104 """Convert a numeric flags value to a human value, using a mapping table."""
105 105 namemap = {v: k for k, v in mapping.iteritems()}
106 106 flags = []
107 107 val = 1
108 108 while value >= val:
109 109 if value & val:
110 110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
111 111 val <<= 1
112 112
113 113 return b'|'.join(flags)
114 114
115 115 @attr.s(slots=True)
116 116 class frameheader(object):
117 117 """Represents the data in a frame header."""
118 118
119 119 length = attr.ib()
120 120 requestid = attr.ib()
121 121 streamid = attr.ib()
122 122 streamflags = attr.ib()
123 123 typeid = attr.ib()
124 124 flags = attr.ib()
125 125
126 126 @attr.s(slots=True, repr=False)
127 127 class frame(object):
128 128 """Represents a parsed frame."""
129 129
130 130 requestid = attr.ib()
131 131 streamid = attr.ib()
132 132 streamflags = attr.ib()
133 133 typeid = attr.ib()
134 134 flags = attr.ib()
135 135 payload = attr.ib()
136 136
137 137 @encoding.strmethod
138 138 def __repr__(self):
139 139 typename = '<unknown 0x%02x>' % self.typeid
140 140 for name, value in FRAME_TYPES.iteritems():
141 141 if value == self.typeid:
142 142 typename = name
143 143 break
144 144
145 145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
146 146 'type=%s; flags=%s)' % (
147 147 len(self.payload), self.requestid, self.streamid,
148 148 humanflags(STREAM_FLAGS, self.streamflags), typename,
149 149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
150 150
151 151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
152 152 """Assemble a frame into a byte array."""
153 153 # TODO assert size of payload.
154 154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
155 155
156 156 # 24 bits length
157 157 # 16 bits request id
158 158 # 8 bits stream id
159 159 # 8 bits stream flags
160 160 # 4 bits type
161 161 # 4 bits flags
162 162
163 163 l = struct.pack(r'<I', len(payload))
164 164 frame[0:3] = l[0:3]
165 165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
166 166 frame[7] = (typeid << 4) | flags
167 167 frame[8:] = payload
168 168
169 169 return frame
170 170
171 171 def makeframefromhumanstring(s):
172 172 """Create a frame from a human readable string
173 173
174 174 Strings have the form:
175 175
176 176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
177 177
178 178 This can be used by user-facing applications and tests for creating
179 179 frames easily without having to type out a bunch of constants.
180 180
181 181 Request ID and stream IDs are integers.
182 182
183 183 Stream flags, frame type, and flags can be specified by integer or
184 184 named constant.
185 185
186 186 Flags can be delimited by `|` to bitwise OR them together.
187 187
188 188 If the payload begins with ``cbor:``, the following string will be
189 189 evaluated as Python literal and the resulting object will be fed into
190 190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
191 191 byte string literal.
192 192 """
193 193 fields = s.split(b' ', 5)
194 194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
195 195
196 196 requestid = int(requestid)
197 197 streamid = int(streamid)
198 198
199 199 finalstreamflags = 0
200 200 for flag in streamflags.split(b'|'):
201 201 if flag in STREAM_FLAGS:
202 202 finalstreamflags |= STREAM_FLAGS[flag]
203 203 else:
204 204 finalstreamflags |= int(flag)
205 205
206 206 if frametype in FRAME_TYPES:
207 207 frametype = FRAME_TYPES[frametype]
208 208 else:
209 209 frametype = int(frametype)
210 210
211 211 finalflags = 0
212 212 validflags = FRAME_TYPE_FLAGS[frametype]
213 213 for flag in frameflags.split(b'|'):
214 214 if flag in validflags:
215 215 finalflags |= validflags[flag]
216 216 else:
217 217 finalflags |= int(flag)
218 218
219 219 if payload.startswith(b'cbor:'):
220 220 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
221 221 canonical=True)
222 222
223 223 else:
224 224 payload = stringutil.unescapestr(payload)
225 225
226 226 return makeframe(requestid=requestid, streamid=streamid,
227 227 streamflags=finalstreamflags, typeid=frametype,
228 228 flags=finalflags, payload=payload)
229 229
230 230 def parseheader(data):
231 231 """Parse a unified framing protocol frame header from a buffer.
232 232
233 233 The header is expected to be in the buffer at offset 0 and the
234 234 buffer is expected to be large enough to hold a full header.
235 235 """
236 236 # 24 bits payload length (little endian)
237 237 # 16 bits request ID
238 238 # 8 bits stream ID
239 239 # 8 bits stream flags
240 240 # 4 bits frame type
241 241 # 4 bits frame flags
242 242 # ... payload
243 243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
244 244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
245 245 typeflags = data[7]
246 246
247 247 frametype = (typeflags & 0xf0) >> 4
248 248 frameflags = typeflags & 0x0f
249 249
250 250 return frameheader(framelength, requestid, streamid, streamflags,
251 251 frametype, frameflags)
252 252
253 253 def readframe(fh):
254 254 """Read a unified framing protocol frame from a file object.
255 255
256 256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
257 257 None if no frame is available. May raise if a malformed frame is
258 258 seen.
259 259 """
260 260 header = bytearray(FRAME_HEADER_SIZE)
261 261
262 262 readcount = fh.readinto(header)
263 263
264 264 if readcount == 0:
265 265 return None
266 266
267 267 if readcount != FRAME_HEADER_SIZE:
268 268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
269 269 (readcount, header))
270 270
271 271 h = parseheader(header)
272 272
273 273 payload = fh.read(h.length)
274 274 if len(payload) != h.length:
275 275 raise error.Abort(_('frame length error: expected %d; got %d') %
276 276 (h.length, len(payload)))
277 277
278 278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
279 279 payload)
280 280
281 281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
282 282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
283 283 """Create frames necessary to transmit a request to run a command.
284 284
285 285 This is a generator of bytearrays. Each item represents a frame
286 286 ready to be sent over the wire to a peer.
287 287 """
288 288 data = {b'name': cmd}
289 289 if args:
290 290 data[b'args'] = args
291 291
292 292 data = cbor.dumps(data, canonical=True)
293 293
294 294 offset = 0
295 295
296 296 while True:
297 297 flags = 0
298 298
299 299 # Must set new or continuation flag.
300 300 if not offset:
301 301 flags |= FLAG_COMMAND_REQUEST_NEW
302 302 else:
303 303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
304 304
305 305 # Data frames is set on all frames.
306 306 if datafh:
307 307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
308 308
309 309 payload = data[offset:offset + maxframesize]
310 310 offset += len(payload)
311 311
312 312 if len(payload) == maxframesize and offset < len(data):
313 313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
314 314
315 315 yield stream.makeframe(requestid=requestid,
316 316 typeid=FRAME_TYPE_COMMAND_REQUEST,
317 317 flags=flags,
318 318 payload=payload)
319 319
320 320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
321 321 break
322 322
323 323 if datafh:
324 324 while True:
325 325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
326 326
327 327 done = False
328 328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
329 329 flags = FLAG_COMMAND_DATA_CONTINUATION
330 330 else:
331 331 flags = FLAG_COMMAND_DATA_EOS
332 332 assert datafh.read(1) == b''
333 333 done = True
334 334
335 335 yield stream.makeframe(requestid=requestid,
336 336 typeid=FRAME_TYPE_COMMAND_DATA,
337 337 flags=flags,
338 338 payload=data)
339 339
340 340 if done:
341 341 break
342 342
343 343 def createcommandresponseframesfrombytes(stream, requestid, data,
344 344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
345 345 """Create a raw frame to send a bytes response from static bytes input.
346 346
347 347 Returns a generator of bytearrays.
348 348 """
349 349 # Automatically send the overall CBOR response map.
350 350 overall = cbor.dumps({b'status': b'ok'}, canonical=True)
351 351 if len(overall) > maxframesize:
352 352 raise error.ProgrammingError('not yet implemented')
353 353
354 354 # Simple case where we can fit the full response in a single frame.
355 355 if len(overall) + len(data) <= maxframesize:
356 356 flags = FLAG_COMMAND_RESPONSE_EOS
357 357 yield stream.makeframe(requestid=requestid,
358 358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
359 359 flags=flags,
360 360 payload=overall + data)
361 361 return
362 362
363 363 # It's easier to send the overall CBOR map in its own frame than to track
364 364 # offsets.
365 365 yield stream.makeframe(requestid=requestid,
366 366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
367 367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
368 368 payload=overall)
369 369
370 370 offset = 0
371 371 while True:
372 372 chunk = data[offset:offset + maxframesize]
373 373 offset += len(chunk)
374 374 done = offset == len(data)
375 375
376 376 if done:
377 377 flags = FLAG_COMMAND_RESPONSE_EOS
378 378 else:
379 379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
380 380
381 381 yield stream.makeframe(requestid=requestid,
382 382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 383 flags=flags,
384 384 payload=chunk)
385 385
386 386 if done:
387 387 break
388 388
389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 overall = cbor.dumps({b'status': b'ok'}, canonical=True)
392
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_COMMAND_RESPONSE,
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
396 payload=overall)
397
398 cb = util.chunkbuffer(gen)
399
400 flags = 0
401
402 while True:
403 chunk = cb.read(maxframesize)
404 if not chunk:
405 break
406
407 yield stream.makeframe(requestid=requestid,
408 typeid=FRAME_TYPE_COMMAND_RESPONSE,
409 flags=flags,
410 payload=chunk)
411
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
413
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
415 flags |= FLAG_COMMAND_RESPONSE_EOS
416 yield stream.makeframe(requestid=requestid,
417 typeid=FRAME_TYPE_COMMAND_RESPONSE,
418 flags=flags,
419 payload=b'')
420
421 def createcommanderrorresponse(stream, requestid, message, args=None):
422 m = {
423 b'status': b'error',
424 b'error': {
425 b'message': message,
426 }
427 }
428
429 if args:
430 m[b'error'][b'args'] = args
431
432 overall = cbor.dumps(m, canonical=True)
433
434 yield stream.makeframe(requestid=requestid,
435 typeid=FRAME_TYPE_COMMAND_RESPONSE,
436 flags=FLAG_COMMAND_RESPONSE_EOS,
437 payload=overall)
438
389 439 def createerrorframe(stream, requestid, msg, errtype):
390 440 # TODO properly handle frame size limits.
391 441 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
392 442
393 443 payload = cbor.dumps({
394 444 b'type': errtype,
395 445 b'message': [{b'msg': msg}],
396 446 }, canonical=True)
397 447
398 448 yield stream.makeframe(requestid=requestid,
399 449 typeid=FRAME_TYPE_ERROR_RESPONSE,
400 450 flags=0,
401 451 payload=payload)
402 452
403 453 def createtextoutputframe(stream, requestid, atoms,
404 454 maxframesize=DEFAULT_MAX_FRAME_SIZE):
405 455 """Create a text output frame to render text to people.
406 456
407 457 ``atoms`` is a 3-tuple of (formatting string, args, labels).
408 458
409 459 The formatting string contains ``%s`` tokens to be replaced by the
410 460 corresponding indexed entry in ``args``. ``labels`` is an iterable of
411 461 formatters to be applied at rendering time. In terms of the ``ui``
412 462 class, each atom corresponds to a ``ui.write()``.
413 463 """
414 464 atomdicts = []
415 465
416 466 for (formatting, args, labels) in atoms:
417 467 # TODO look for localstr, other types here?
418 468
419 469 if not isinstance(formatting, bytes):
420 470 raise ValueError('must use bytes formatting strings')
421 471 for arg in args:
422 472 if not isinstance(arg, bytes):
423 473 raise ValueError('must use bytes for arguments')
424 474 for label in labels:
425 475 if not isinstance(label, bytes):
426 476 raise ValueError('must use bytes for labels')
427 477
428 478 # Formatting string must be ASCII.
429 479 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
430 480
431 481 # Arguments must be UTF-8.
432 482 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
433 483
434 484 # Labels must be ASCII.
435 485 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
436 486 for l in labels]
437 487
438 488 atom = {b'msg': formatting}
439 489 if args:
440 490 atom[b'args'] = args
441 491 if labels:
442 492 atom[b'labels'] = labels
443 493
444 494 atomdicts.append(atom)
445 495
446 496 payload = cbor.dumps(atomdicts, canonical=True)
447 497
448 498 if len(payload) > maxframesize:
449 499 raise ValueError('cannot encode data in a single frame')
450 500
451 501 yield stream.makeframe(requestid=requestid,
452 502 typeid=FRAME_TYPE_TEXT_OUTPUT,
453 503 flags=0,
454 504 payload=payload)
455 505
456 506 class stream(object):
457 507 """Represents a logical unidirectional series of frames."""
458 508
459 509 def __init__(self, streamid, active=False):
460 510 self.streamid = streamid
461 511 self._active = active
462 512
463 513 def makeframe(self, requestid, typeid, flags, payload):
464 514 """Create a frame to be sent out over this stream.
465 515
466 516 Only returns the frame instance. Does not actually send it.
467 517 """
468 518 streamflags = 0
469 519 if not self._active:
470 520 streamflags |= STREAM_FLAG_BEGIN_STREAM
471 521 self._active = True
472 522
473 523 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
474 524 payload)
475 525
476 526 def ensureserverstream(stream):
477 527 if stream.streamid % 2:
478 528 raise error.ProgrammingError('server should only write to even '
479 529 'numbered streams; %d is not even' %
480 530 stream.streamid)
481 531
482 532 class serverreactor(object):
483 533 """Holds state of a server handling frame-based protocol requests.
484 534
485 535 This class is the "brain" of the unified frame-based protocol server
486 536 component. While the protocol is stateless from the perspective of
487 537 requests/commands, something needs to track which frames have been
488 538 received, what frames to expect, etc. This class is that thing.
489 539
490 540 Instances are modeled as a state machine of sorts. Instances are also
491 541 reactionary to external events. The point of this class is to encapsulate
492 542 the state of the connection and the exchange of frames, not to perform
493 543 work. Instead, callers tell this class when something occurs, like a
494 544 frame arriving. If that activity is worthy of a follow-up action (say
495 545 *run a command*), the return value of that handler will say so.
496 546
497 547 I/O and CPU intensive operations are purposefully delegated outside of
498 548 this class.
499 549
500 550 Consumers are expected to tell instances when events occur. They do so by
501 551 calling the various ``on*`` methods. These methods return a 2-tuple
502 552 describing any follow-up action(s) to take. The first element is the
503 553 name of an action to perform. The second is a data structure (usually
504 554 a dict) specific to that action that contains more information. e.g.
505 555 if the server wants to send frames back to the client, the data structure
506 556 will contain a reference to those frames.
507 557
508 558 Valid actions that consumers can be instructed to take are:
509 559
510 560 sendframes
511 561 Indicates that frames should be sent to the client. The ``framegen``
512 562 key contains a generator of frames that should be sent. The server
513 563 assumes that all frames are sent to the client.
514 564
515 565 error
516 566 Indicates that an error occurred. Consumer should probably abort.
517 567
518 568 runcommand
519 569 Indicates that the consumer should run a wire protocol command. Details
520 570 of the command to run are given in the data structure.
521 571
522 572 wantframe
523 573 Indicates that nothing of interest happened and the server is waiting on
524 574 more frames from the client before anything interesting can be done.
525 575
526 576 noop
527 577 Indicates no additional action is required.
528 578
529 579 Known Issues
530 580 ------------
531 581
532 582 There are no limits to the number of partially received commands or their
533 583 size. A malicious client could stream command request data and exhaust the
534 584 server's memory.
535 585
536 586 Partially received commands are not acted upon when end of input is
537 587 reached. Should the server error if it receives a partial request?
538 588 Should the client send a message to abort a partially transmitted request
539 589 to facilitate graceful shutdown?
540 590
541 591 Active requests that haven't been responded to aren't tracked. This means
542 592 that if we receive a command and instruct its dispatch, another command
543 593 with its request ID can come in over the wire and there will be a race
544 594 between who responds to what.
545 595 """
546 596
547 597 def __init__(self, deferoutput=False):
548 598 """Construct a new server reactor.
549 599
550 600 ``deferoutput`` can be used to indicate that no output frames should be
551 601 instructed to be sent until input has been exhausted. In this mode,
552 602 events that would normally generate output frames (such as a command
553 603 response being ready) will instead defer instructing the consumer to
554 604 send those frames. This is useful for half-duplex transports where the
555 605 sender cannot receive until all data has been transmitted.
556 606 """
557 607 self._deferoutput = deferoutput
558 608 self._state = 'idle'
559 609 self._nextoutgoingstreamid = 2
560 610 self._bufferedframegens = []
561 611 # stream id -> stream instance for all active streams from the client.
562 612 self._incomingstreams = {}
563 613 self._outgoingstreams = {}
564 614 # request id -> dict of commands that are actively being received.
565 615 self._receivingcommands = {}
566 616 # Request IDs that have been received and are actively being processed.
567 617 # Once all output for a request has been sent, it is removed from this
568 618 # set.
569 619 self._activecommands = set()
570 620
571 621 def onframerecv(self, frame):
572 622 """Process a frame that has been received off the wire.
573 623
574 624 Returns a dict with an ``action`` key that details what action,
575 625 if any, the consumer should take next.
576 626 """
577 627 if not frame.streamid % 2:
578 628 self._state = 'errored'
579 629 return self._makeerrorresult(
580 630 _('received frame with even numbered stream ID: %d') %
581 631 frame.streamid)
582 632
583 633 if frame.streamid not in self._incomingstreams:
584 634 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
585 635 self._state = 'errored'
586 636 return self._makeerrorresult(
587 637 _('received frame on unknown inactive stream without '
588 638 'beginning of stream flag set'))
589 639
590 640 self._incomingstreams[frame.streamid] = stream(frame.streamid)
591 641
592 642 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
593 643 # TODO handle decoding frames
594 644 self._state = 'errored'
595 645 raise error.ProgrammingError('support for decoding stream payloads '
596 646 'not yet implemented')
597 647
598 648 if frame.streamflags & STREAM_FLAG_END_STREAM:
599 649 del self._incomingstreams[frame.streamid]
600 650
601 651 handlers = {
602 652 'idle': self._onframeidle,
603 653 'command-receiving': self._onframecommandreceiving,
604 654 'errored': self._onframeerrored,
605 655 }
606 656
607 657 meth = handlers.get(self._state)
608 658 if not meth:
609 659 raise error.ProgrammingError('unhandled state: %s' % self._state)
610 660
611 661 return meth(frame)
612 662
613 663 def oncommandresponseready(self, stream, requestid, data):
614 664 """Signal that a bytes response is ready to be sent to the client.
615 665
616 666 The raw bytes response is passed as an argument.
617 667 """
618 668 ensureserverstream(stream)
619 669
620 670 def sendframes():
621 671 for frame in createcommandresponseframesfrombytes(stream, requestid,
622 672 data):
623 673 yield frame
624 674
625 675 self._activecommands.remove(requestid)
626 676
627 677 result = sendframes()
628 678
629 679 if self._deferoutput:
630 680 self._bufferedframegens.append(result)
631 681 return 'noop', {}
632 682 else:
633 683 return 'sendframes', {
634 684 'framegen': result,
635 685 }
636 686
687 def oncommandresponsereadygen(self, stream, requestid, gen):
688 """Signal that a bytes response is ready, with data as a generator."""
689 ensureserverstream(stream)
690
691 def sendframes():
692 for frame in createbytesresponseframesfromgen(stream, requestid,
693 gen):
694 yield frame
695
696 self._activecommands.remove(requestid)
697
698 return self._handlesendframes(sendframes())
699
637 700 def oninputeof(self):
638 701 """Signals that end of input has been received.
639 702
640 703 No more frames will be received. All pending activity should be
641 704 completed.
642 705 """
643 706 # TODO should we do anything about in-flight commands?
644 707
645 708 if not self._deferoutput or not self._bufferedframegens:
646 709 return 'noop', {}
647 710
648 711 # If we buffered all our responses, emit those.
649 712 def makegen():
650 713 for gen in self._bufferedframegens:
651 714 for frame in gen:
652 715 yield frame
653 716
654 717 return 'sendframes', {
655 718 'framegen': makegen(),
656 719 }
657 720
721 def _handlesendframes(self, framegen):
722 if self._deferoutput:
723 self._bufferedframegens.append(framegen)
724 return 'noop', {}
725 else:
726 return 'sendframes', {
727 'framegen': framegen,
728 }
729
658 730 def onservererror(self, stream, requestid, msg):
659 731 ensureserverstream(stream)
660 732
661 return 'sendframes', {
662 'framegen': createerrorframe(stream, requestid, msg,
663 errtype='server'),
664 }
733 def sendframes():
734 for frame in createerrorframe(stream, requestid, msg,
735 errtype='server'):
736 yield frame
737
738 self._activecommands.remove(requestid)
739
740 return self._handlesendframes(sendframes())
741
742 def oncommanderror(self, stream, requestid, message, args=None):
743 """Called when a command encountered an error before sending output."""
744 ensureserverstream(stream)
745
746 def sendframes():
747 for frame in createcommanderrorresponse(stream, requestid, message,
748 args):
749 yield frame
750
751 self._activecommands.remove(requestid)
752
753 return self._handlesendframes(sendframes())
665 754
666 755 def makeoutputstream(self):
667 756 """Create a stream to be used for sending data to the client."""
668 757 streamid = self._nextoutgoingstreamid
669 758 self._nextoutgoingstreamid += 2
670 759
671 760 s = stream(streamid)
672 761 self._outgoingstreams[streamid] = s
673 762
674 763 return s
675 764
676 765 def _makeerrorresult(self, msg):
677 766 return 'error', {
678 767 'message': msg,
679 768 }
680 769
681 770 def _makeruncommandresult(self, requestid):
682 771 entry = self._receivingcommands[requestid]
683 772
684 773 if not entry['requestdone']:
685 774 self._state = 'errored'
686 775 raise error.ProgrammingError('should not be called without '
687 776 'requestdone set')
688 777
689 778 del self._receivingcommands[requestid]
690 779
691 780 if self._receivingcommands:
692 781 self._state = 'command-receiving'
693 782 else:
694 783 self._state = 'idle'
695 784
696 785 # Decode the payloads as CBOR.
697 786 entry['payload'].seek(0)
698 787 request = cbor.load(entry['payload'])
699 788
700 789 if b'name' not in request:
701 790 self._state = 'errored'
702 791 return self._makeerrorresult(
703 792 _('command request missing "name" field'))
704 793
705 794 if b'args' not in request:
706 795 request[b'args'] = {}
707 796
708 797 assert requestid not in self._activecommands
709 798 self._activecommands.add(requestid)
710 799
711 800 return 'runcommand', {
712 801 'requestid': requestid,
713 802 'command': request[b'name'],
714 803 'args': request[b'args'],
715 804 'data': entry['data'].getvalue() if entry['data'] else None,
716 805 }
717 806
718 807 def _makewantframeresult(self):
719 808 return 'wantframe', {
720 809 'state': self._state,
721 810 }
722 811
723 812 def _validatecommandrequestframe(self, frame):
724 813 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
725 814 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
726 815
727 816 if new and continuation:
728 817 self._state = 'errored'
729 818 return self._makeerrorresult(
730 819 _('received command request frame with both new and '
731 820 'continuation flags set'))
732 821
733 822 if not new and not continuation:
734 823 self._state = 'errored'
735 824 return self._makeerrorresult(
736 825 _('received command request frame with neither new nor '
737 826 'continuation flags set'))
738 827
739 828 def _onframeidle(self, frame):
740 829 # The only frame type that should be received in this state is a
741 830 # command request.
742 831 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
743 832 self._state = 'errored'
744 833 return self._makeerrorresult(
745 834 _('expected command request frame; got %d') % frame.typeid)
746 835
747 836 res = self._validatecommandrequestframe(frame)
748 837 if res:
749 838 return res
750 839
751 840 if frame.requestid in self._receivingcommands:
752 841 self._state = 'errored'
753 842 return self._makeerrorresult(
754 843 _('request with ID %d already received') % frame.requestid)
755 844
756 845 if frame.requestid in self._activecommands:
757 846 self._state = 'errored'
758 847 return self._makeerrorresult(
759 848 _('request with ID %d is already active') % frame.requestid)
760 849
761 850 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
762 851 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
763 852 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
764 853
765 854 if not new:
766 855 self._state = 'errored'
767 856 return self._makeerrorresult(
768 857 _('received command request frame without new flag set'))
769 858
770 859 payload = util.bytesio()
771 860 payload.write(frame.payload)
772 861
773 862 self._receivingcommands[frame.requestid] = {
774 863 'payload': payload,
775 864 'data': None,
776 865 'requestdone': not moreframes,
777 866 'expectingdata': bool(expectingdata),
778 867 }
779 868
780 869 # This is the final frame for this request. Dispatch it.
781 870 if not moreframes and not expectingdata:
782 871 return self._makeruncommandresult(frame.requestid)
783 872
784 873 assert moreframes or expectingdata
785 874 self._state = 'command-receiving'
786 875 return self._makewantframeresult()
787 876
788 877 def _onframecommandreceiving(self, frame):
789 878 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
790 879 # Process new command requests as such.
791 880 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
792 881 return self._onframeidle(frame)
793 882
794 883 res = self._validatecommandrequestframe(frame)
795 884 if res:
796 885 return res
797 886
798 887 # All other frames should be related to a command that is currently
799 888 # receiving but is not active.
800 889 if frame.requestid in self._activecommands:
801 890 self._state = 'errored'
802 891 return self._makeerrorresult(
803 892 _('received frame for request that is still active: %d') %
804 893 frame.requestid)
805 894
806 895 if frame.requestid not in self._receivingcommands:
807 896 self._state = 'errored'
808 897 return self._makeerrorresult(
809 898 _('received frame for request that is not receiving: %d') %
810 899 frame.requestid)
811 900
812 901 entry = self._receivingcommands[frame.requestid]
813 902
814 903 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
815 904 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
816 905 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
817 906
818 907 if entry['requestdone']:
819 908 self._state = 'errored'
820 909 return self._makeerrorresult(
821 910 _('received command request frame when request frames '
822 911 'were supposedly done'))
823 912
824 913 if expectingdata != entry['expectingdata']:
825 914 self._state = 'errored'
826 915 return self._makeerrorresult(
827 916 _('mismatch between expect data flag and previous frame'))
828 917
829 918 entry['payload'].write(frame.payload)
830 919
831 920 if not moreframes:
832 921 entry['requestdone'] = True
833 922
834 923 if not moreframes and not expectingdata:
835 924 return self._makeruncommandresult(frame.requestid)
836 925
837 926 return self._makewantframeresult()
838 927
839 928 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
840 929 if not entry['expectingdata']:
841 930 self._state = 'errored'
842 931 return self._makeerrorresult(_(
843 932 'received command data frame for request that is not '
844 933 'expecting data: %d') % frame.requestid)
845 934
846 935 if entry['data'] is None:
847 936 entry['data'] = util.bytesio()
848 937
849 938 return self._handlecommanddataframe(frame, entry)
850 939 else:
851 940 self._state = 'errored'
852 941 return self._makeerrorresult(_(
853 942 'received unexpected frame type: %d') % frame.typeid)
854 943
855 944 def _handlecommanddataframe(self, frame, entry):
856 945 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
857 946
858 947 # TODO support streaming data instead of buffering it.
859 948 entry['data'].write(frame.payload)
860 949
861 950 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
862 951 return self._makewantframeresult()
863 952 elif frame.flags & FLAG_COMMAND_DATA_EOS:
864 953 entry['data'].seek(0)
865 954 return self._makeruncommandresult(frame.requestid)
866 955 else:
867 956 self._state = 'errored'
868 957 return self._makeerrorresult(_('command data frame without '
869 958 'flags'))
870 959
871 960 def _onframeerrored(self, frame):
872 961 return self._makeerrorresult(_('server already errored'))
873 962
874 963 class commandrequest(object):
875 964 """Represents a request to run a command."""
876 965
877 966 def __init__(self, requestid, name, args, datafh=None):
878 967 self.requestid = requestid
879 968 self.name = name
880 969 self.args = args
881 970 self.datafh = datafh
882 971 self.state = 'pending'
883 972
884 973 class clientreactor(object):
885 974 """Holds state of a client issuing frame-based protocol requests.
886 975
887 976 This is like ``serverreactor`` but for client-side state.
888 977
889 978 Each instance is bound to the lifetime of a connection. For persistent
890 979 connection transports using e.g. TCP sockets and speaking the raw
891 980 framing protocol, there will be a single instance for the lifetime of
892 981 the TCP socket. For transports where there are multiple discrete
893 982 interactions (say tunneled within in HTTP request), there will be a
894 983 separate instance for each distinct interaction.
895 984 """
896 985 def __init__(self, hasmultiplesend=False, buffersends=True):
897 986 """Create a new instance.
898 987
899 988 ``hasmultiplesend`` indicates whether multiple sends are supported
900 989 by the transport. When True, it is possible to send commands immediately
901 990 instead of buffering until the caller signals an intent to finish a
902 991 send operation.
903 992
904 993 ``buffercommands`` indicates whether sends should be buffered until the
905 994 last request has been issued.
906 995 """
907 996 self._hasmultiplesend = hasmultiplesend
908 997 self._buffersends = buffersends
909 998
910 999 self._canissuecommands = True
911 1000 self._cansend = True
912 1001
913 1002 self._nextrequestid = 1
914 1003 # We only support a single outgoing stream for now.
915 1004 self._outgoingstream = stream(1)
916 1005 self._pendingrequests = collections.deque()
917 1006 self._activerequests = {}
918 1007 self._incomingstreams = {}
919 1008
920 1009 def callcommand(self, name, args, datafh=None):
921 1010 """Request that a command be executed.
922 1011
923 1012 Receives the command name, a dict of arguments to pass to the command,
924 1013 and an optional file object containing the raw data for the command.
925 1014
926 1015 Returns a 3-tuple of (request, action, action data).
927 1016 """
928 1017 if not self._canissuecommands:
929 1018 raise error.ProgrammingError('cannot issue new commands')
930 1019
931 1020 requestid = self._nextrequestid
932 1021 self._nextrequestid += 2
933 1022
934 1023 request = commandrequest(requestid, name, args, datafh=datafh)
935 1024
936 1025 if self._buffersends:
937 1026 self._pendingrequests.append(request)
938 1027 return request, 'noop', {}
939 1028 else:
940 1029 if not self._cansend:
941 1030 raise error.ProgrammingError('sends cannot be performed on '
942 1031 'this instance')
943 1032
944 1033 if not self._hasmultiplesend:
945 1034 self._cansend = False
946 1035 self._canissuecommands = False
947 1036
948 1037 return request, 'sendframes', {
949 1038 'framegen': self._makecommandframes(request),
950 1039 }
951 1040
952 1041 def flushcommands(self):
953 1042 """Request that all queued commands be sent.
954 1043
955 1044 If any commands are buffered, this will instruct the caller to send
956 1045 them over the wire. If no commands are buffered it instructs the client
957 1046 to no-op.
958 1047
959 1048 If instances aren't configured for multiple sends, no new command
960 1049 requests are allowed after this is called.
961 1050 """
962 1051 if not self._pendingrequests:
963 1052 return 'noop', {}
964 1053
965 1054 if not self._cansend:
966 1055 raise error.ProgrammingError('sends cannot be performed on this '
967 1056 'instance')
968 1057
969 1058 # If the instance only allows sending once, mark that we have fired
970 1059 # our one shot.
971 1060 if not self._hasmultiplesend:
972 1061 self._canissuecommands = False
973 1062 self._cansend = False
974 1063
975 1064 def makeframes():
976 1065 while self._pendingrequests:
977 1066 request = self._pendingrequests.popleft()
978 1067 for frame in self._makecommandframes(request):
979 1068 yield frame
980 1069
981 1070 return 'sendframes', {
982 1071 'framegen': makeframes(),
983 1072 }
984 1073
985 1074 def _makecommandframes(self, request):
986 1075 """Emit frames to issue a command request.
987 1076
988 1077 As a side-effect, update request accounting to reflect its changed
989 1078 state.
990 1079 """
991 1080 self._activerequests[request.requestid] = request
992 1081 request.state = 'sending'
993 1082
994 1083 res = createcommandframes(self._outgoingstream,
995 1084 request.requestid,
996 1085 request.name,
997 1086 request.args,
998 1087 request.datafh)
999 1088
1000 1089 for frame in res:
1001 1090 yield frame
1002 1091
1003 1092 request.state = 'sent'
1004 1093
1005 1094 def onframerecv(self, frame):
1006 1095 """Process a frame that has been received off the wire.
1007 1096
1008 1097 Returns a 2-tuple of (action, meta) describing further action the
1009 1098 caller needs to take as a result of receiving this frame.
1010 1099 """
1011 1100 if frame.streamid % 2:
1012 1101 return 'error', {
1013 1102 'message': (
1014 1103 _('received frame with odd numbered stream ID: %d') %
1015 1104 frame.streamid),
1016 1105 }
1017 1106
1018 1107 if frame.streamid not in self._incomingstreams:
1019 1108 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1020 1109 return 'error', {
1021 1110 'message': _('received frame on unknown stream '
1022 1111 'without beginning of stream flag set'),
1023 1112 }
1024 1113
1025 1114 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1026 1115
1027 1116 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1028 1117 raise error.ProgrammingError('support for decoding stream '
1029 1118 'payloads not yet implemneted')
1030 1119
1031 1120 if frame.streamflags & STREAM_FLAG_END_STREAM:
1032 1121 del self._incomingstreams[frame.streamid]
1033 1122
1034 1123 if frame.requestid not in self._activerequests:
1035 1124 return 'error', {
1036 1125 'message': (_('received frame for inactive request ID: %d') %
1037 1126 frame.requestid),
1038 1127 }
1039 1128
1040 1129 request = self._activerequests[frame.requestid]
1041 1130 request.state = 'receiving'
1042 1131
1043 1132 handlers = {
1044 1133 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1045 1134 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1046 1135 }
1047 1136
1048 1137 meth = handlers.get(frame.typeid)
1049 1138 if not meth:
1050 1139 raise error.ProgrammingError('unhandled frame type: %d' %
1051 1140 frame.typeid)
1052 1141
1053 1142 return meth(request, frame)
1054 1143
1055 1144 def _oncommandresponseframe(self, request, frame):
1056 1145 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1057 1146 request.state = 'received'
1058 1147 del self._activerequests[request.requestid]
1059 1148
1060 1149 return 'responsedata', {
1061 1150 'request': request,
1062 1151 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1063 1152 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1064 1153 'data': frame.payload,
1065 1154 }
1066 1155
1067 1156 def _onerrorresponseframe(self, request, frame):
1068 1157 request.state = 'errored'
1069 1158 del self._activerequests[request.requestid]
1070 1159
1071 1160 # The payload should be a CBOR map.
1072 1161 m = cbor.loads(frame.payload)
1073 1162
1074 1163 return 'error', {
1075 1164 'request': request,
1076 1165 'type': m['type'],
1077 1166 'message': m['message'],
1078 1167 }
@@ -1,227 +1,243
1 1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 from .node import (
9 9 bin,
10 10 hex,
11 11 )
12 12 from .thirdparty.zope import (
13 13 interface as zi,
14 14 )
15 15
16 16 # Names of the SSH protocol implementations.
17 17 SSHV1 = 'ssh-v1'
18 18 # These are advertised over the wire. Increment the counters at the end
19 19 # to reflect BC breakages.
20 20 SSHV2 = 'exp-ssh-v2-0001'
21 21 HTTP_WIREPROTO_V2 = 'exp-http-v2-0001'
22 22
23 23 # All available wire protocol transports.
24 24 TRANSPORTS = {
25 25 SSHV1: {
26 26 'transport': 'ssh',
27 27 'version': 1,
28 28 },
29 29 SSHV2: {
30 30 'transport': 'ssh',
31 31 # TODO mark as version 2 once all commands are implemented.
32 32 'version': 1,
33 33 },
34 34 'http-v1': {
35 35 'transport': 'http',
36 36 'version': 1,
37 37 },
38 38 HTTP_WIREPROTO_V2: {
39 39 'transport': 'http',
40 40 'version': 2,
41 41 }
42 42 }
43 43
44 44 class bytesresponse(object):
45 45 """A wire protocol response consisting of raw bytes."""
46 46 def __init__(self, data):
47 47 self.data = data
48 48
49 49 class ooberror(object):
50 50 """wireproto reply: failure of a batch of operation
51 51
52 52 Something failed during a batch call. The error message is stored in
53 53 `self.message`.
54 54 """
55 55 def __init__(self, message):
56 56 self.message = message
57 57
58 58 class pushres(object):
59 59 """wireproto reply: success with simple integer return
60 60
61 61 The call was successful and returned an integer contained in `self.res`.
62 62 """
63 63 def __init__(self, res, output):
64 64 self.res = res
65 65 self.output = output
66 66
67 67 class pusherr(object):
68 68 """wireproto reply: failure
69 69
70 70 The call failed. The `self.res` attribute contains the error message.
71 71 """
72 72 def __init__(self, res, output):
73 73 self.res = res
74 74 self.output = output
75 75
76 76 class streamres(object):
77 77 """wireproto reply: binary stream
78 78
79 79 The call was successful and the result is a stream.
80 80
81 81 Accepts a generator containing chunks of data to be sent to the client.
82 82
83 83 ``prefer_uncompressed`` indicates that the data is expected to be
84 84 uncompressable and that the stream should therefore use the ``none``
85 85 engine.
86 86 """
87 87 def __init__(self, gen=None, prefer_uncompressed=False):
88 88 self.gen = gen
89 89 self.prefer_uncompressed = prefer_uncompressed
90 90
91 91 class streamreslegacy(object):
92 92 """wireproto reply: uncompressed binary stream
93 93
94 94 The call was successful and the result is a stream.
95 95
96 96 Accepts a generator containing chunks of data to be sent to the client.
97 97
98 98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
99 99 using the application/mercurial-0.1 media type.
100 100 """
101 101 def __init__(self, gen=None):
102 102 self.gen = gen
103 103
104 104 class cborresponse(object):
105 105 """Encode the response value as CBOR."""
106 106 def __init__(self, v):
107 107 self.value = v
108 108
109 class v2errorresponse(object):
110 """Represents a command error for version 2 transports."""
111 def __init__(self, message, args=None):
112 self.message = message
113 self.args = args
114
115 class v2streamingresponse(object):
116 """A response whose data is supplied by a generator.
117
118 The generator can either consist of data structures to CBOR
119 encode or a stream of already-encoded bytes.
120 """
121 def __init__(self, gen, compressible=True):
122 self.gen = gen
123 self.compressible = compressible
124
109 125 # list of nodes encoding / decoding
110 126 def decodelist(l, sep=' '):
111 127 if l:
112 128 return [bin(v) for v in l.split(sep)]
113 129 return []
114 130
115 131 def encodelist(l, sep=' '):
116 132 try:
117 133 return sep.join(map(hex, l))
118 134 except TypeError:
119 135 raise
120 136
121 137 # batched call argument encoding
122 138
123 139 def escapebatcharg(plain):
124 140 return (plain
125 141 .replace(':', ':c')
126 142 .replace(',', ':o')
127 143 .replace(';', ':s')
128 144 .replace('=', ':e'))
129 145
130 146 def unescapebatcharg(escaped):
131 147 return (escaped
132 148 .replace(':e', '=')
133 149 .replace(':s', ';')
134 150 .replace(':o', ',')
135 151 .replace(':c', ':'))
136 152
137 153 # mapping of options accepted by getbundle and their types
138 154 #
139 155 # Meant to be extended by extensions. It is extensions responsibility to ensure
140 156 # such options are properly processed in exchange.getbundle.
141 157 #
142 158 # supported types are:
143 159 #
144 160 # :nodes: list of binary nodes
145 161 # :csv: list of comma-separated values
146 162 # :scsv: list of comma-separated values return as set
147 163 # :plain: string with no transformation needed.
148 164 GETBUNDLE_ARGUMENTS = {
149 165 'heads': 'nodes',
150 166 'bookmarks': 'boolean',
151 167 'common': 'nodes',
152 168 'obsmarkers': 'boolean',
153 169 'phases': 'boolean',
154 170 'bundlecaps': 'scsv',
155 171 'listkeys': 'csv',
156 172 'cg': 'boolean',
157 173 'cbattempted': 'boolean',
158 174 'stream': 'boolean',
159 175 }
160 176
161 177 class baseprotocolhandler(zi.Interface):
162 178 """Abstract base class for wire protocol handlers.
163 179
164 180 A wire protocol handler serves as an interface between protocol command
165 181 handlers and the wire protocol transport layer. Protocol handlers provide
166 182 methods to read command arguments, redirect stdio for the duration of
167 183 the request, handle response types, etc.
168 184 """
169 185
170 186 name = zi.Attribute(
171 187 """The name of the protocol implementation.
172 188
173 189 Used for uniquely identifying the transport type.
174 190 """)
175 191
176 192 def getargs(args):
177 193 """return the value for arguments in <args>
178 194
179 195 For version 1 transports, returns a list of values in the same
180 196 order they appear in ``args``. For version 2 transports, returns
181 197 a dict mapping argument name to value.
182 198 """
183 199
184 200 def getprotocaps():
185 201 """Returns the list of protocol-level capabilities of client
186 202
187 203 Returns a list of capabilities as declared by the client for
188 204 the current request (or connection for stateful protocol handlers)."""
189 205
190 206 def getpayload():
191 207 """Provide a generator for the raw payload.
192 208
193 209 The caller is responsible for ensuring that the full payload is
194 210 processed.
195 211 """
196 212
197 213 def mayberedirectstdio():
198 214 """Context manager to possibly redirect stdio.
199 215
200 216 The context manager yields a file-object like object that receives
201 217 stdout and stderr output when the context manager is active. Or it
202 218 yields ``None`` if no I/O redirection occurs.
203 219
204 220 The intent of this context manager is to capture stdio output
205 221 so it may be sent in the response. Some transports support streaming
206 222 stdio to the client in real time. For these transports, stdio output
207 223 won't be captured.
208 224 """
209 225
210 226 def client():
211 227 """Returns a string representation of this client (as bytes)."""
212 228
213 229 def addcapabilities(repo, caps):
214 230 """Adds advertised capabilities specific to this protocol.
215 231
216 232 Receives the list of capabilities collected so far.
217 233
218 234 Returns a list of capabilities. The passed in argument can be returned.
219 235 """
220 236
221 237 def checkperm(perm):
222 238 """Validate that the client has permissions to perform a request.
223 239
224 240 The argument is the permission required to proceed. If the client
225 241 doesn't have that permission, the exception should raise or abort
226 242 in a protocol specific manner.
227 243 """
@@ -1,480 +1,489
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10
11 11 from .i18n import _
12 12 from .thirdparty import (
13 13 cbor,
14 14 )
15 15 from .thirdparty.zope import (
16 16 interface as zi,
17 17 )
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 pycompat,
22 22 streamclone,
23 23 util,
24 24 wireproto,
25 25 wireprotoframing,
26 26 wireprototypes,
27 27 )
28 28
29 29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30 30
31 31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32 32
33 33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 34 from .hgweb import common as hgwebcommon
35 35
36 36 # URL space looks like: <permissions>/<command>, where <permission> can
37 37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38 38
39 39 # Root URL does nothing meaningful... yet.
40 40 if not urlparts:
41 41 res.status = b'200 OK'
42 42 res.headers[b'Content-Type'] = b'text/plain'
43 43 res.setbodybytes(_('HTTP version 2 API handler'))
44 44 return
45 45
46 46 if len(urlparts) == 1:
47 47 res.status = b'404 Not Found'
48 48 res.headers[b'Content-Type'] = b'text/plain'
49 49 res.setbodybytes(_('do not know how to process %s\n') %
50 50 req.dispatchpath)
51 51 return
52 52
53 53 permission, command = urlparts[0:2]
54 54
55 55 if permission not in (b'ro', b'rw'):
56 56 res.status = b'404 Not Found'
57 57 res.headers[b'Content-Type'] = b'text/plain'
58 58 res.setbodybytes(_('unknown permission: %s') % permission)
59 59 return
60 60
61 61 if req.method != 'POST':
62 62 res.status = b'405 Method Not Allowed'
63 63 res.headers[b'Allow'] = b'POST'
64 64 res.setbodybytes(_('commands require POST requests'))
65 65 return
66 66
67 67 # At some point we'll want to use our own API instead of recycling the
68 68 # behavior of version 1 of the wire protocol...
69 69 # TODO return reasonable responses - not responses that overload the
70 70 # HTTP status line message for error reporting.
71 71 try:
72 72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 73 except hgwebcommon.ErrorResponse as e:
74 74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 75 for k, v in e.headers:
76 76 res.headers[k] = v
77 77 res.setbodybytes('permission denied')
78 78 return
79 79
80 80 # We have a special endpoint to reflect the request back at the client.
81 81 if command == b'debugreflect':
82 82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 83 return
84 84
85 85 # Extra commands that we handle that aren't really wire protocol
86 86 # commands. Think extra hard before making this hackery available to
87 87 # extension.
88 88 extracommands = {'multirequest'}
89 89
90 90 if command not in wireproto.commandsv2 and command not in extracommands:
91 91 res.status = b'404 Not Found'
92 92 res.headers[b'Content-Type'] = b'text/plain'
93 93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 94 return
95 95
96 96 repo = rctx.repo
97 97 ui = repo.ui
98 98
99 99 proto = httpv2protocolhandler(req, ui)
100 100
101 101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 102 and command not in extracommands):
103 103 res.status = b'404 Not Found'
104 104 res.headers[b'Content-Type'] = b'text/plain'
105 105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 106 return
107 107
108 108 # TODO consider cases where proxies may add additional Accept headers.
109 109 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 110 res.status = b'406 Not Acceptable'
111 111 res.headers[b'Content-Type'] = b'text/plain'
112 112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 113 % FRAMINGTYPE)
114 114 return
115 115
116 116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 117 res.status = b'415 Unsupported Media Type'
118 118 # TODO we should send a response with appropriate media type,
119 119 # since client does Accept it.
120 120 res.headers[b'Content-Type'] = b'text/plain'
121 121 res.setbodybytes(_('client MUST send Content-Type header with '
122 122 'value: %s\n') % FRAMINGTYPE)
123 123 return
124 124
125 125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126 126
127 127 def _processhttpv2reflectrequest(ui, repo, req, res):
128 128 """Reads unified frame protocol request and dumps out state to client.
129 129
130 130 This special endpoint can be used to help debug the wire protocol.
131 131
132 132 Instead of routing the request through the normal dispatch mechanism,
133 133 we instead read all frames, decode them, and feed them into our state
134 134 tracker. We then dump the log of all that activity back out to the
135 135 client.
136 136 """
137 137 import json
138 138
139 139 # Reflection APIs have a history of being abused, accidentally disclosing
140 140 # sensitive data, etc. So we have a config knob.
141 141 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 142 res.status = b'404 Not Found'
143 143 res.headers[b'Content-Type'] = b'text/plain'
144 144 res.setbodybytes(_('debugreflect service not available'))
145 145 return
146 146
147 147 # We assume we have a unified framing protocol request body.
148 148
149 149 reactor = wireprotoframing.serverreactor()
150 150 states = []
151 151
152 152 while True:
153 153 frame = wireprotoframing.readframe(req.bodyfh)
154 154
155 155 if not frame:
156 156 states.append(b'received: <no frame>')
157 157 break
158 158
159 159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 160 frame.requestid,
161 161 frame.payload))
162 162
163 163 action, meta = reactor.onframerecv(frame)
164 164 states.append(json.dumps((action, meta), sort_keys=True,
165 165 separators=(', ', ': ')))
166 166
167 167 action, meta = reactor.oninputeof()
168 168 meta['action'] = action
169 169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170 170
171 171 res.status = b'200 OK'
172 172 res.headers[b'Content-Type'] = b'text/plain'
173 173 res.setbodybytes(b'\n'.join(states))
174 174
175 175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 176 """Post-validation handler for HTTPv2 requests.
177 177
178 178 Called when the HTTP request contains unified frame-based protocol
179 179 frames for evaluation.
180 180 """
181 181 # TODO Some HTTP clients are full duplex and can receive data before
182 182 # the entire request is transmitted. Figure out a way to indicate support
183 183 # for that so we can opt into full duplex mode.
184 184 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 185 seencommand = False
186 186
187 187 outstream = reactor.makeoutputstream()
188 188
189 189 while True:
190 190 frame = wireprotoframing.readframe(req.bodyfh)
191 191 if not frame:
192 192 break
193 193
194 194 action, meta = reactor.onframerecv(frame)
195 195
196 196 if action == 'wantframe':
197 197 # Need more data before we can do anything.
198 198 continue
199 199 elif action == 'runcommand':
200 200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 201 reqcommand, reactor, outstream,
202 202 meta, issubsequent=seencommand)
203 203
204 204 if sentoutput:
205 205 return
206 206
207 207 seencommand = True
208 208
209 209 elif action == 'error':
210 210 # TODO define proper error mechanism.
211 211 res.status = b'200 OK'
212 212 res.headers[b'Content-Type'] = b'text/plain'
213 213 res.setbodybytes(meta['message'] + b'\n')
214 214 return
215 215 else:
216 216 raise error.ProgrammingError(
217 217 'unhandled action from frame processor: %s' % action)
218 218
219 219 action, meta = reactor.oninputeof()
220 220 if action == 'sendframes':
221 221 # We assume we haven't started sending the response yet. If we're
222 222 # wrong, the response type will raise an exception.
223 223 res.status = b'200 OK'
224 224 res.headers[b'Content-Type'] = FRAMINGTYPE
225 225 res.setbodygen(meta['framegen'])
226 226 elif action == 'noop':
227 227 pass
228 228 else:
229 229 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 230 % action)
231 231
232 232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 233 outstream, command, issubsequent):
234 234 """Dispatch a wire protocol command made from HTTPv2 requests.
235 235
236 236 The authenticated permission (``authedperm``) along with the original
237 237 command from the URL (``reqcommand``) are passed in.
238 238 """
239 239 # We already validated that the session has permissions to perform the
240 240 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 241 # command to run is expressed in a frame. However, the URL also requested
242 242 # to run a specific command. We need to be careful that the command we
243 243 # run doesn't have permissions requirements greater than what was granted
244 244 # by ``authedperm``.
245 245 #
246 246 # Our rule for this is we only allow one command per HTTP request and
247 247 # that command must match the command in the URL. However, we make
248 248 # an exception for the ``multirequest`` URL. This URL is allowed to
249 249 # execute multiple commands. We double check permissions of each command
250 250 # as it is invoked to ensure there is no privilege escalation.
251 251 # TODO consider allowing multiple commands to regular command URLs
252 252 # iff each command is the same.
253 253
254 254 proto = httpv2protocolhandler(req, ui, args=command['args'])
255 255
256 256 if reqcommand == b'multirequest':
257 257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 258 # TODO proper error mechanism
259 259 res.status = b'200 OK'
260 260 res.headers[b'Content-Type'] = b'text/plain'
261 261 res.setbodybytes(_('wire protocol command not available: %s') %
262 262 command['command'])
263 263 return True
264 264
265 265 # TODO don't use assert here, since it may be elided by -O.
266 266 assert authedperm in (b'ro', b'rw')
267 267 wirecommand = wireproto.commandsv2[command['command']]
268 268 assert wirecommand.permission in ('push', 'pull')
269 269
270 270 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 271 # TODO proper error mechanism
272 272 res.status = b'403 Forbidden'
273 273 res.headers[b'Content-Type'] = b'text/plain'
274 274 res.setbodybytes(_('insufficient permissions to execute '
275 275 'command: %s') % command['command'])
276 276 return True
277 277
278 278 # TODO should we also call checkperm() here? Maybe not if we're going
279 279 # to overhaul that API. The granted scope from the URL check should
280 280 # be good enough.
281 281
282 282 else:
283 283 # Don't allow multiple commands outside of ``multirequest`` URL.
284 284 if issubsequent:
285 285 # TODO proper error mechanism
286 286 res.status = b'200 OK'
287 287 res.headers[b'Content-Type'] = b'text/plain'
288 288 res.setbodybytes(_('multiple commands cannot be issued to this '
289 289 'URL'))
290 290 return True
291 291
292 292 if reqcommand != command['command']:
293 293 # TODO define proper error mechanism
294 294 res.status = b'200 OK'
295 295 res.headers[b'Content-Type'] = b'text/plain'
296 296 res.setbodybytes(_('command in frame must match command in URL'))
297 297 return True
298 298
299 299 rsp = wireproto.dispatch(repo, proto, command['command'])
300 300
301 301 res.status = b'200 OK'
302 302 res.headers[b'Content-Type'] = FRAMINGTYPE
303 303
304 304 if isinstance(rsp, wireprototypes.cborresponse):
305 305 encoded = cbor.dumps(rsp.value, canonical=True)
306 306 action, meta = reactor.oncommandresponseready(outstream,
307 307 command['requestid'],
308 308 encoded)
309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 command['requestid'],
312 rsp.gen)
313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 action, meta = reactor.oncommanderror(outstream,
315 command['requestid'],
316 rsp.message,
317 rsp.args)
309 318 else:
310 319 action, meta = reactor.onservererror(
311 320 _('unhandled response type from wire proto command'))
312 321
313 322 if action == 'sendframes':
314 323 res.setbodygen(meta['framegen'])
315 324 return True
316 325 elif action == 'noop':
317 326 return False
318 327 else:
319 328 raise error.ProgrammingError('unhandled event from reactor: %s' %
320 329 action)
321 330
322 331 @zi.implementer(wireprototypes.baseprotocolhandler)
323 332 class httpv2protocolhandler(object):
324 333 def __init__(self, req, ui, args=None):
325 334 self._req = req
326 335 self._ui = ui
327 336 self._args = args
328 337
329 338 @property
330 339 def name(self):
331 340 return HTTP_WIREPROTO_V2
332 341
333 342 def getargs(self, args):
334 343 data = {}
335 344 for k, typ in args.items():
336 345 if k == '*':
337 346 raise NotImplementedError('do not support * args')
338 347 elif k in self._args:
339 348 # TODO consider validating value types.
340 349 data[k] = self._args[k]
341 350
342 351 return data
343 352
344 353 def getprotocaps(self):
345 354 # Protocol capabilities are currently not implemented for HTTP V2.
346 355 return set()
347 356
348 357 def getpayload(self):
349 358 raise NotImplementedError
350 359
351 360 @contextlib.contextmanager
352 361 def mayberedirectstdio(self):
353 362 raise NotImplementedError
354 363
355 364 def client(self):
356 365 raise NotImplementedError
357 366
358 367 def addcapabilities(self, repo, caps):
359 368 return caps
360 369
361 370 def checkperm(self, perm):
362 371 raise NotImplementedError
363 372
364 373 def httpv2apidescriptor(req, repo):
365 374 proto = httpv2protocolhandler(req, repo.ui)
366 375
367 376 return _capabilitiesv2(repo, proto)
368 377
369 378 def _capabilitiesv2(repo, proto):
370 379 """Obtain the set of capabilities for version 2 transports.
371 380
372 381 These capabilities are distinct from the capabilities for version 1
373 382 transports.
374 383 """
375 384 compression = []
376 385 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
377 386 compression.append({
378 387 b'name': engine.wireprotosupport().name,
379 388 })
380 389
381 390 caps = {
382 391 'commands': {},
383 392 'compression': compression,
384 393 'framingmediatypes': [FRAMINGTYPE],
385 394 }
386 395
387 396 for command, entry in wireproto.commandsv2.items():
388 397 caps['commands'][command] = {
389 398 'args': entry.args,
390 399 'permissions': [entry.permission],
391 400 }
392 401
393 402 if streamclone.allowservergeneration(repo):
394 403 caps['rawrepoformats'] = sorted(repo.requirements &
395 404 repo.supportedformats)
396 405
397 406 return proto.addcapabilities(repo, caps)
398 407
399 408 def wireprotocommand(*args, **kwargs):
400 409 def register(func):
401 410 return wireproto.wireprotocommand(
402 411 *args, transportpolicy=wireproto.POLICY_V2_ONLY, **kwargs)(func)
403 412
404 413 return register
405 414
406 415 @wireprotocommand('branchmap', permission='pull')
407 416 def branchmapv2(repo, proto):
408 417 branchmap = {encoding.fromlocal(k): v
409 418 for k, v in repo.branchmap().iteritems()}
410 419
411 420 return wireprototypes.cborresponse(branchmap)
412 421
413 422 @wireprotocommand('capabilities', permission='pull')
414 423 def capabilitiesv2(repo, proto):
415 424 caps = _capabilitiesv2(repo, proto)
416 425
417 426 return wireprototypes.cborresponse(caps)
418 427
419 428 @wireprotocommand('heads',
420 429 args={
421 430 'publiconly': False,
422 431 },
423 432 permission='pull')
424 433 def headsv2(repo, proto, publiconly=False):
425 434 if publiconly:
426 435 repo = repo.filtered('immutable')
427 436
428 437 return wireprototypes.cborresponse(repo.heads())
429 438
430 439 @wireprotocommand('known',
431 440 args={
432 441 'nodes': [b'deadbeef'],
433 442 },
434 443 permission='pull')
435 444 def knownv2(repo, proto, nodes=None):
436 445 nodes = nodes or []
437 446 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
438 447 return wireprototypes.cborresponse(result)
439 448
440 449 @wireprotocommand('listkeys',
441 450 args={
442 451 'namespace': b'ns',
443 452 },
444 453 permission='pull')
445 454 def listkeysv2(repo, proto, namespace=None):
446 455 keys = repo.listkeys(encoding.tolocal(namespace))
447 456 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
448 457 for k, v in keys.iteritems()}
449 458
450 459 return wireprototypes.cborresponse(keys)
451 460
452 461 @wireprotocommand('lookup',
453 462 args={
454 463 'key': b'foo',
455 464 },
456 465 permission='pull')
457 466 def lookupv2(repo, proto, key):
458 467 key = encoding.tolocal(key)
459 468
460 469 # TODO handle exception.
461 470 node = repo.lookup(key)
462 471
463 472 return wireprototypes.cborresponse(node)
464 473
465 474 @wireprotocommand('pushkey',
466 475 args={
467 476 'namespace': b'ns',
468 477 'key': b'key',
469 478 'old': b'old',
470 479 'new': b'new',
471 480 },
472 481 permission='push')
473 482 def pushkeyv2(repo, proto, namespace, key, old, new):
474 483 # TODO handle ui output redirection
475 484 r = repo.pushkey(encoding.tolocal(namespace),
476 485 encoding.tolocal(key),
477 486 encoding.tolocal(old),
478 487 encoding.tolocal(new))
479 488
480 489 return wireprototypes.cborresponse(r)
General Comments 0
You need to be logged in to leave comments. Login now