##// END OF EJS Templates
wireproto: add streams to frame-based protocol...
wireproto: add streams to frame-based protocol Previously, the frame-based protocol was just a series of frames, with each frame associated with a request ID. In order to scale the protocol, we'll want to enable the use of compression. While it is possible to enable compression at the socket/pipe level, this has its disadvantages. The big one is it undermines the point of frames being standalone, atomic units that can be read and written: if you add compression above the framing protocol, you are back to having a stream-based protocol as opposed to something frame-based. So in order to preserve frames, compression needs to occur at the frame payload level. Compressing each frame's payload individually will limit compression ratios because the window size of the compressor will be limited by the max frame size, which is 32-64kb as currently defined. It will also add CPU overhead, as it is more efficient for compressors to operate on fewer, larger blocks of data than more, smaller blocks. So compressing each frame independently is out. This means we need to compress each frame's payload as if it is part of a larger stream. The simplest approach is to have 1 stream per connection. This could certainly work. However, it has disadvantages (documented below). We could also have 1 stream per RPC/command invocation. (This is the model HTTP/2 goes with.) This also has disadvantages. The main disadvantage to one global stream is that it has the very real potential to create CPU bottlenecks doing compression. Networks are only getting faster and the performance of single CPU cores has been relatively flat. Newer compression formats like zstandard offer better CPU cycle efficiency than predecessors like zlib. But it still all too common to saturate your CPU with compression overhead long before you saturate the network pipe. The main disadvantage with streams per request is that you can't reap the benefits of the compression context for multiple requests. For example, if you send 1000 RPC requests (or HTTP/2 requests for that matter), the response to each would have its own compression context. The overall size of the raw responses would be larger because compression contexts wouldn't be able to reference data from another request or response. The approach for streams as implemented in this commit is to support N streams per connection and for streams to potentially span requests and responses. As explained by the added internals docs, this facilitates servers and clients delegating independent streams and compression to independent threads / CPU cores. This helps alleviate the CPU bottleneck of compression. This design also allows compression contexts to be reused across requests/responses. This can result in improved compression ratios and less overhead for compressors and decompressors having to build new contexts. Another feature that was defined was the ability for individual frames within a stream to declare whether that individual frame's payload uses the content encoding (read: compression) defined by the stream. The idea here is that some servers may serve data from a combination of caches and dynamic resolution. Data coming from caches may be pre-compressed. We want to facilitate servers being able to essentially stream bytes from caches to the wire with minimal overhead. Being able to mix and match with frames are compressed within a stream enables these types of advanced server functionality. This commit defines the new streams mechanism. Basic code for supporting streams in frames has been added. But that code is seriously lacking and doesn't fully conform to the defined protocol. For example, we don't close any streams. And support for content encoding within streams is not yet implemented. The change was rather invasive and I didn't think it would be reasonable to implement the entire feature in a single commit. For the record, I would have loved to reuse an existing multiplexing protocol to build the new wire protocol on top of. However, I couldn't find a protocol that offers the performance and scaling characteristics that I desired. Namely, it should support multiple compression contexts to facilitate scaling out to multiple CPU cores and compression contexts should be able to live longer than single RPC requests. HTTP/2 *almost* fits the bill. But the semantics of HTTP message exchange state that streams can only live for a single request-response. We /could/ tunnel on top of HTTP/2 streams and frames with HEADER and DATA frames. But there's no guarantee that HTTP/2 libraries and proxies would allow us to use HTTP/2 streams and frames without the HTTP message exchange semantics defined in RFC 7540 Section 8. Other RPC protocols like gRPC tunnel are built on top of HTTP/2 and thus preserve its semantics of stream per RPC invocation. Even QUIC does this. We could attempt to invent a higher-level stream that spans HTTP/2 streams. But this would be violating HTTP/2 because there is no guarantee that HTTP/2 streams are routed to the same server. The best we can do - which is what this protocol does - is shoehorn all request and response data into a single HTTP message and create streams within. At that point, we've defined a Content-Type in HTTP parlance. It just so happens our media type can also work as a standalone, stream-based protocol, without leaning on HTTP or similar protocol. Differential Revision: https://phab.mercurial-scm.org/D2907

File last commit:

r37304:9bfcbe4f default
r37304:9bfcbe4f default
Show More
wireprotoframing.py
813 lines | 27.5 KiB | text/x-python | PythonLexer
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 # wireprotoframing.py - unified framing protocol for wire protocol
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# This file contains functionality to support the unified frame-based wire
# protocol. For details about the protocol, see
# `hg help internals.wireprotocol`.
from __future__ import absolute_import
import struct
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 from .i18n import _
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 from .thirdparty import (
attr,
)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 from . import (
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 error,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 util,
)
Yuya Nishihara
stringutil: bulk-replace call sites to point to new module...
r37102 from .utils import (
stringutil,
)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 FRAME_HEADER_SIZE = 8
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 DEFAULT_MAX_FRAME_SIZE = 32768
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 STREAM_FLAG_BEGIN_STREAM = 0x01
STREAM_FLAG_END_STREAM = 0x02
STREAM_FLAG_ENCODING_APPLIED = 0x04
STREAM_FLAGS = {
b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
b'stream-end': STREAM_FLAG_END_STREAM,
b'encoded': STREAM_FLAG_ENCODING_APPLIED,
}
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 FRAME_TYPE_COMMAND_NAME = 0x01
FRAME_TYPE_COMMAND_ARGUMENT = 0x02
FRAME_TYPE_COMMAND_DATA = 0x03
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 FRAME_TYPE_BYTES_RESPONSE = 0x04
FRAME_TYPE_ERROR_RESPONSE = 0x05
Gregory Szorc
wireproto: define human output side channel frame...
r37078 FRAME_TYPE_TEXT_OUTPUT = 0x06
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 FRAME_TYPE_STREAM_SETTINGS = 0x08
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
FRAME_TYPES = {
b'command-name': FRAME_TYPE_COMMAND_NAME,
b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
b'command-data': FRAME_TYPE_COMMAND_DATA,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
b'error-response': FRAME_TYPE_ERROR_RESPONSE,
Gregory Szorc
wireproto: define human output side channel frame...
r37078 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 }
FLAG_COMMAND_NAME_EOS = 0x01
FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
FLAG_COMMAND_NAME_HAVE_DATA = 0x04
FLAGS_COMMAND = {
b'eos': FLAG_COMMAND_NAME_EOS,
b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
}
FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
FLAG_COMMAND_ARGUMENT_EOA = 0x02
FLAGS_COMMAND_ARGUMENT = {
b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
}
FLAG_COMMAND_DATA_CONTINUATION = 0x01
FLAG_COMMAND_DATA_EOS = 0x02
FLAGS_COMMAND_DATA = {
b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
b'eos': FLAG_COMMAND_DATA_EOS,
}
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
FLAG_BYTES_RESPONSE_EOS = 0x02
FLAGS_BYTES_RESPONSE = {
b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
b'eos': FLAG_BYTES_RESPONSE_EOS,
}
FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
FLAG_ERROR_RESPONSE_APPLICATION = 0x02
FLAGS_ERROR_RESPONSE = {
b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
b'application': FLAG_ERROR_RESPONSE_APPLICATION,
}
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 # Maps frame types to their available flags.
FRAME_TYPE_FLAGS = {
FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
Gregory Szorc
wireproto: define human output side channel frame...
r37078 FRAME_TYPE_TEXT_OUTPUT: {},
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 FRAME_TYPE_STREAM_SETTINGS: {},
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 }
ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 @attr.s(slots=True)
class frameheader(object):
"""Represents the data in a frame header."""
length = attr.ib()
requestid = attr.ib()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamid = attr.ib()
streamflags = attr.ib()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 typeid = attr.ib()
flags = attr.ib()
@attr.s(slots=True)
class frame(object):
"""Represents a parsed frame."""
requestid = attr.ib()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamid = attr.ib()
streamflags = attr.ib()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 typeid = attr.ib()
flags = attr.ib()
payload = attr.ib()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 """Assemble a frame into a byte array."""
# TODO assert size of payload.
frame = bytearray(FRAME_HEADER_SIZE + len(payload))
Gregory Szorc
wireproto: add request IDs to frames...
r37075 # 24 bits length
# 16 bits request id
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 # 8 bits stream id
# 8 bits stream flags
Gregory Szorc
wireproto: add request IDs to frames...
r37075 # 4 bits type
# 4 bits flags
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 l = struct.pack(r'<I', len(payload))
frame[0:3] = l[0:3]
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
frame[7] = (typeid << 4) | flags
frame[8:] = payload
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
return frame
def makeframefromhumanstring(s):
Gregory Szorc
wireproto: add request IDs to frames...
r37075 """Create a frame from a human readable string
Strings have the form:
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
This can be used by user-facing applications and tests for creating
frames easily without having to type out a bunch of constants.
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 Request ID and stream IDs are integers.
Gregory Szorc
wireproto: add request IDs to frames...
r37075
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 Stream flags, frame type, and flags can be specified by integer or
named constant.
Gregory Szorc
wireproto: add request IDs to frames...
r37075
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 Flags can be delimited by `|` to bitwise OR them together.
"""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 fields = s.split(b' ', 5)
requestid, streamid, streamflags, frametype, frameflags, payload = fields
Gregory Szorc
wireproto: add request IDs to frames...
r37075
requestid = int(requestid)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamid = int(streamid)
finalstreamflags = 0
for flag in streamflags.split(b'|'):
if flag in STREAM_FLAGS:
finalstreamflags |= STREAM_FLAGS[flag]
else:
finalstreamflags |= int(flag)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
if frametype in FRAME_TYPES:
frametype = FRAME_TYPES[frametype]
else:
frametype = int(frametype)
finalflags = 0
validflags = FRAME_TYPE_FLAGS[frametype]
for flag in frameflags.split(b'|'):
if flag in validflags:
finalflags |= validflags[flag]
else:
finalflags |= int(flag)
Yuya Nishihara
stringutil: bulk-replace call sites to point to new module...
r37102 payload = stringutil.unescapestr(payload)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 return makeframe(requestid=requestid, streamid=streamid,
streamflags=finalstreamflags, typeid=frametype,
Gregory Szorc
wireproto: use named arguments when passing around frame data...
r37080 flags=finalflags, payload=payload)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def parseheader(data):
"""Parse a unified framing protocol frame header from a buffer.
The header is expected to be in the buffer at offset 0 and the
buffer is expected to be large enough to hold a full header.
"""
# 24 bits payload length (little endian)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 # 16 bits request ID
# 8 bits stream ID
# 8 bits stream flags
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 # 4 bits frame type
# 4 bits frame flags
# ... payload
framelength = data[0] + 256 * data[1] + 16384 * data[2]
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
typeflags = data[7]
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
frametype = (typeflags & 0xf0) >> 4
frameflags = typeflags & 0x0f
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 return frameheader(framelength, requestid, streamid, streamflags,
frametype, frameflags)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
def readframe(fh):
"""Read a unified framing protocol frame from a file object.
Returns a 3-tuple of (type, flags, payload) for the decoded frame or
None if no frame is available. May raise if a malformed frame is
seen.
"""
header = bytearray(FRAME_HEADER_SIZE)
readcount = fh.readinto(header)
if readcount == 0:
return None
if readcount != FRAME_HEADER_SIZE:
raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
(readcount, header))
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 h = parseheader(header)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 payload = fh.read(h.length)
if len(payload) != h.length:
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 raise error.Abort(_('frame length error: expected %d; got %d') %
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 (h.length, len(payload)))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
payload)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def createcommandframes(stream, requestid, cmd, args, datafh=None):
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 """Create frames necessary to transmit a request to run a command.
This is a generator of bytearrays. Each item represents a frame
ready to be sent over the wire to a peer.
"""
flags = 0
if args:
flags |= FLAG_COMMAND_NAME_HAVE_ARGS
if datafh:
flags |= FLAG_COMMAND_NAME_HAVE_DATA
if not flags:
flags |= FLAG_COMMAND_NAME_EOS
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
flags=flags, payload=cmd)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
for i, k in enumerate(sorted(args)):
v = args[k]
last = i == len(args) - 1
# TODO handle splitting of argument values across frames.
payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
offset = 0
ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
offset += ARGUMENT_FRAME_HEADER.size
payload[offset:offset + len(k)] = k
offset += len(k)
payload[offset:offset + len(v)] = v
flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 yield stream.makeframe(requestid=requestid,
typeid=FRAME_TYPE_COMMAND_ARGUMENT,
flags=flags,
payload=payload)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
if datafh:
while True:
data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
done = False
if len(data) == DEFAULT_MAX_FRAME_SIZE:
flags = FLAG_COMMAND_DATA_CONTINUATION
else:
flags = FLAG_COMMAND_DATA_EOS
assert datafh.read(1) == b''
done = True
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 yield stream.makeframe(requestid=requestid,
typeid=FRAME_TYPE_COMMAND_DATA,
flags=flags,
payload=data)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
if done:
break
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def createbytesresponseframesfrombytes(stream, requestid, data,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 maxframesize=DEFAULT_MAX_FRAME_SIZE):
"""Create a raw frame to send a bytes response from static bytes input.
Returns a generator of bytearrays.
"""
# Simple case of a single frame.
if len(data) <= maxframesize:
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 yield stream.makeframe(requestid=requestid,
typeid=FRAME_TYPE_BYTES_RESPONSE,
flags=FLAG_BYTES_RESPONSE_EOS,
payload=data)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 return
offset = 0
while True:
chunk = data[offset:offset + maxframesize]
offset += len(chunk)
done = offset == len(data)
if done:
flags = FLAG_BYTES_RESPONSE_EOS
else:
flags = FLAG_BYTES_RESPONSE_CONTINUATION
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 yield stream.makeframe(requestid=requestid,
typeid=FRAME_TYPE_BYTES_RESPONSE,
flags=flags,
payload=chunk)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
if done:
break
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 # TODO properly handle frame size limits.
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
flags = 0
if protocol:
flags |= FLAG_ERROR_RESPONSE_PROTOCOL
if application:
flags |= FLAG_ERROR_RESPONSE_APPLICATION
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 yield stream.makeframe(requestid=requestid,
typeid=FRAME_TYPE_ERROR_RESPONSE,
flags=flags,
payload=msg)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def createtextoutputframe(stream, requestid, atoms):
Gregory Szorc
wireproto: define human output side channel frame...
r37078 """Create a text output frame to render text to people.
``atoms`` is a 3-tuple of (formatting string, args, labels).
The formatting string contains ``%s`` tokens to be replaced by the
corresponding indexed entry in ``args``. ``labels`` is an iterable of
formatters to be applied at rendering time. In terms of the ``ui``
class, each atom corresponds to a ``ui.write()``.
"""
bytesleft = DEFAULT_MAX_FRAME_SIZE
atomchunks = []
for (formatting, args, labels) in atoms:
if len(args) > 255:
raise ValueError('cannot use more than 255 formatting arguments')
if len(labels) > 255:
raise ValueError('cannot use more than 255 labels')
# TODO look for localstr, other types here?
if not isinstance(formatting, bytes):
raise ValueError('must use bytes formatting strings')
for arg in args:
if not isinstance(arg, bytes):
raise ValueError('must use bytes for arguments')
for label in labels:
if not isinstance(label, bytes):
raise ValueError('must use bytes for labels')
# Formatting string must be UTF-8.
formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
# Arguments must be UTF-8.
args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
# Labels must be ASCII.
labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
for l in labels]
if len(formatting) > 65535:
raise ValueError('formatting string cannot be longer than 64k')
if any(len(a) > 65535 for a in args):
raise ValueError('argument string cannot be longer than 64k')
if any(len(l) > 255 for l in labels):
raise ValueError('label string cannot be longer than 255 bytes')
chunks = [
struct.pack(r'<H', len(formatting)),
struct.pack(r'<BB', len(labels), len(args)),
struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
struct.pack(r'<' + r'H' * len(args), *map(len, args)),
]
chunks.append(formatting)
chunks.extend(labels)
chunks.extend(args)
atom = b''.join(chunks)
atomchunks.append(atom)
bytesleft -= len(atom)
if bytesleft < 0:
raise ValueError('cannot encode data in a single frame')
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 yield stream.makeframe(requestid=requestid,
typeid=FRAME_TYPE_TEXT_OUTPUT,
flags=0,
payload=b''.join(atomchunks))
class stream(object):
"""Represents a logical unidirectional series of frames."""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 def __init__(self, streamid, active=False):
self.streamid = streamid
self._active = False
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def makeframe(self, requestid, typeid, flags, payload):
"""Create a frame to be sent out over this stream.
Only returns the frame instance. Does not actually send it.
"""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamflags = 0
if not self._active:
streamflags |= STREAM_FLAG_BEGIN_STREAM
self._active = True
return makeframe(requestid, self.streamid, streamflags, typeid, flags,
payload)
def ensureserverstream(stream):
if stream.streamid % 2:
raise error.ProgrammingError('server should only write to even '
'numbered streams; %d is not even' %
stream.streamid)
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
This class is the "brain" of the unified frame-based protocol server
component. While the protocol is stateless from the perspective of
requests/commands, something needs to track which frames have been
received, what frames to expect, etc. This class is that thing.
Instances are modeled as a state machine of sorts. Instances are also
reactionary to external events. The point of this class is to encapsulate
the state of the connection and the exchange of frames, not to perform
work. Instead, callers tell this class when something occurs, like a
frame arriving. If that activity is worthy of a follow-up action (say
*run a command*), the return value of that handler will say so.
I/O and CPU intensive operations are purposefully delegated outside of
this class.
Consumers are expected to tell instances when events occur. They do so by
calling the various ``on*`` methods. These methods return a 2-tuple
describing any follow-up action(s) to take. The first element is the
name of an action to perform. The second is a data structure (usually
a dict) specific to that action that contains more information. e.g.
if the server wants to send frames back to the client, the data structure
will contain a reference to those frames.
Valid actions that consumers can be instructed to take are:
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 sendframes
Indicates that frames should be sent to the client. The ``framegen``
key contains a generator of frames that should be sent. The server
assumes that all frames are sent to the client.
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 error
Indicates that an error occurred. Consumer should probably abort.
runcommand
Indicates that the consumer should run a wire protocol command. Details
of the command to run are given in the data structure.
wantframe
Indicates that nothing of interest happened and the server is waiting on
more frames from the client before anything interesting can be done.
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
noop
Indicates no additional action is required.
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Known Issues
------------
There are no limits to the number of partially received commands or their
size. A malicious client could stream command request data and exhaust the
server's memory.
Partially received commands are not acted upon when end of input is
reached. Should the server error if it receives a partial request?
Should the client send a message to abort a partially transmitted request
to facilitate graceful shutdown?
Active requests that haven't been responded to aren't tracked. This means
that if we receive a command and instruct its dispatch, another command
with its request ID can come in over the wire and there will be a race
between who responds to what.
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 """
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 def __init__(self, deferoutput=False):
"""Construct a new server reactor.
``deferoutput`` can be used to indicate that no output frames should be
instructed to be sent until input has been exhausted. In this mode,
events that would normally generate output frames (such as a command
response being ready) will instead defer instructing the consumer to
send those frames. This is useful for half-duplex transports where the
sender cannot receive until all data has been transmitted.
"""
self._deferoutput = deferoutput
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self._state = 'idle'
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self._bufferedframegens = []
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 # stream id -> stream instance for all active streams from the client.
self._incomingstreams = {}
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # request id -> dict of commands that are actively being received.
self._receivingcommands = {}
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 # Request IDs that have been received and are actively being processed.
# Once all output for a request has been sent, it is removed from this
# set.
self._activecommands = set()
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def onframerecv(self, frame):
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 """Process a frame that has been received off the wire.
Returns a dict with an ``action`` key that details what action,
if any, the consumer should take next.
"""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 if not frame.streamid % 2:
self._state = 'errored'
return self._makeerrorresult(
_('received frame with even numbered stream ID: %d') %
frame.streamid)
if frame.streamid not in self._incomingstreams:
if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
self._state = 'errored'
return self._makeerrorresult(
_('received frame on unknown inactive stream without '
'beginning of stream flag set'))
self._incomingstreams[frame.streamid] = stream(frame.streamid)
if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
# TODO handle decoding frames
self._state = 'errored'
raise error.ProgrammingError('support for decoding stream payloads '
'not yet implemented')
if frame.streamflags & STREAM_FLAG_END_STREAM:
del self._incomingstreams[frame.streamid]
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 handlers = {
'idle': self._onframeidle,
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 'command-receiving': self._onframecommandreceiving,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 'errored': self._onframeerrored,
}
meth = handlers.get(self._state)
if not meth:
raise error.ProgrammingError('unhandled state: %s' % self._state)
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return meth(frame)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def onbytesresponseready(self, stream, requestid, data):
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 """Signal that a bytes response is ready to be sent to the client.
The raw bytes response is passed as an argument.
"""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ensureserverstream(stream)
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 def sendframes():
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 for frame in createbytesresponseframesfrombytes(stream, requestid,
data):
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 yield frame
self._activecommands.remove(requestid)
result = sendframes()
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
if self._deferoutput:
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 self._bufferedframegens.append(result)
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 return 'noop', {}
else:
return 'sendframes', {
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 'framegen': result,
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 }
def oninputeof(self):
"""Signals that end of input has been received.
No more frames will be received. All pending activity should be
completed.
"""
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # TODO should we do anything about in-flight commands?
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 if not self._deferoutput or not self._bufferedframegens:
return 'noop', {}
# If we buffered all our responses, emit those.
def makegen():
for gen in self._bufferedframegens:
for frame in gen:
yield frame
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 return 'sendframes', {
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 'framegen': makegen(),
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 }
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def onapplicationerror(self, stream, requestid, msg):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ensureserverstream(stream)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 return 'sendframes', {
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 'framegen': createerrorframe(stream, requestid, msg,
application=True),
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 }
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def _makeerrorresult(self, msg):
return 'error', {
'message': msg,
}
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 def _makeruncommandresult(self, requestid):
entry = self._receivingcommands[requestid]
del self._receivingcommands[requestid]
if self._receivingcommands:
self._state = 'command-receiving'
else:
self._state = 'idle'
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 assert requestid not in self._activecommands
self._activecommands.add(requestid)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 return 'runcommand', {
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 'requestid': requestid,
'command': entry['command'],
'args': entry['args'],
'data': entry['data'].getvalue() if entry['data'] else None,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 }
def _makewantframeresult(self):
return 'wantframe', {
'state': self._state,
}
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _onframeidle(self, frame):
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 # The only frame type that should be received in this state is a
# command request.
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self._state = 'errored'
return self._makeerrorresult(
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 _('expected command frame; got %d') % frame.typeid)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.requestid in self._receivingcommands:
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 self._state = 'errored'
return self._makeerrorresult(
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 _('request with ID %d already received') % frame.requestid)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 if frame.requestid in self._activecommands:
self._state = 'errored'
return self._makeerrorresult((
_('request with ID %d is already active') % frame.requestid))
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 self._receivingcommands[frame.requestid] = {
'command': frame.payload,
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 'args': {},
'data': None,
'expectingargs': expectingargs,
'expectingdata': expectingdata,
}
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.flags & FLAG_COMMAND_NAME_EOS:
return self._makeruncommandresult(frame.requestid)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 if expectingargs or expectingdata:
self._state = 'command-receiving'
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 return self._makewantframeresult()
else:
self._state = 'errored'
return self._makeerrorresult(_('missing frame flags on '
'command frame'))
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _onframecommandreceiving(self, frame):
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # It could be a new command request. Process it as such.
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
return self._onframeidle(frame)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
# All other frames should be related to a command that is currently
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 # receiving but is not active.
if frame.requestid in self._activecommands:
self._state = 'errored'
return self._makeerrorresult(
_('received frame for request that is still active: %d') %
frame.requestid)
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.requestid not in self._receivingcommands:
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self._state = 'errored'
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 return self._makeerrorresult(
_('received frame for request that is not receiving: %d') %
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 frame.requestid)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 entry = self._receivingcommands[frame.requestid]
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 if not entry['expectingargs']:
self._state = 'errored'
return self._makeerrorresult(_(
'received command argument frame for request that is not '
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 'expecting arguments: %d') % frame.requestid)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return self._handlecommandargsframe(frame, entry)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 if not entry['expectingdata']:
self._state = 'errored'
return self._makeerrorresult(_(
'received command data frame for request that is not '
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 'expecting data: %d') % frame.requestid)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
if entry['data'] is None:
entry['data'] = util.bytesio()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return self._handlecommanddataframe(frame, entry)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _handlecommandargsframe(self, frame, entry):
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # The frame and state of command should have already been validated.
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
offset = 0
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 offset += ARGUMENT_FRAME_HEADER.size
# The argument name MUST fit inside the frame.
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 argname = bytes(frame.payload[offset:offset + namesize])
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 offset += namesize
if len(argname) != namesize:
self._state = 'errored'
return self._makeerrorresult(_('malformed argument frame: '
'partial argument name'))
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 argvalue = bytes(frame.payload[offset:])
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
# Argument value spans multiple frames. Record our active state
# and wait for the next frame.
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 raise error.ProgrammingError('not yet implemented')
# Common case: the argument value is completely contained in this
# frame.
if len(argvalue) != valuesize:
self._state = 'errored'
return self._makeerrorresult(_('malformed argument frame: '
'partial argument value'))
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 entry['args'][argname] = argvalue
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 if entry['expectingdata']:
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 # TODO signal request to run a command once we don't
# buffer data frames.
return self._makewantframeresult()
else:
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return self._makeruncommandresult(frame.requestid)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 else:
return self._makewantframeresult()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _handlecommanddataframe(self, frame, entry):
assert frame.typeid == FRAME_TYPE_COMMAND_DATA
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
# TODO support streaming data instead of buffering it.
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 entry['data'].write(frame.payload)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 return self._makewantframeresult()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 elif frame.flags & FLAG_COMMAND_DATA_EOS:
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 entry['data'].seek(0)
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return self._makeruncommandresult(frame.requestid)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 else:
self._state = 'errored'
return self._makeerrorresult(_('command data frame without '
'flags'))
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _onframeerrored(self, frame):
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 return self._makeerrorresult(_('server already errored'))