wireprotoframing.py
720 lines
| 23.9 KiB
| text/x-python
|
PythonLexer
/ mercurial / wireprotoframing.py
Gregory Szorc
|
r37069 | # wireprotoframing.py - unified framing protocol for wire protocol | ||
# | ||||
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
# This file contains functionality to support the unified frame-based wire | ||||
# protocol. For details about the protocol, see | ||||
# `hg help internals.wireprotocol`. | ||||
from __future__ import absolute_import | ||||
import struct | ||||
Gregory Szorc
|
r37070 | from .i18n import _ | ||
Gregory Szorc
|
r37079 | from .thirdparty import ( | ||
attr, | ||||
) | ||||
Gregory Szorc
|
r37069 | from . import ( | ||
Gregory Szorc
|
r37070 | error, | ||
Gregory Szorc
|
r37069 | util, | ||
) | ||||
Yuya Nishihara
|
r37102 | from .utils import ( | ||
stringutil, | ||||
) | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37075 | FRAME_HEADER_SIZE = 6 | ||
Gregory Szorc
|
r37069 | DEFAULT_MAX_FRAME_SIZE = 32768 | ||
FRAME_TYPE_COMMAND_NAME = 0x01 | ||||
FRAME_TYPE_COMMAND_ARGUMENT = 0x02 | ||||
FRAME_TYPE_COMMAND_DATA = 0x03 | ||||
Gregory Szorc
|
r37073 | FRAME_TYPE_BYTES_RESPONSE = 0x04 | ||
FRAME_TYPE_ERROR_RESPONSE = 0x05 | ||||
Gregory Szorc
|
r37078 | FRAME_TYPE_TEXT_OUTPUT = 0x06 | ||
Gregory Szorc
|
r37069 | |||
FRAME_TYPES = { | ||||
b'command-name': FRAME_TYPE_COMMAND_NAME, | ||||
b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT, | ||||
b'command-data': FRAME_TYPE_COMMAND_DATA, | ||||
Gregory Szorc
|
r37073 | b'bytes-response': FRAME_TYPE_BYTES_RESPONSE, | ||
b'error-response': FRAME_TYPE_ERROR_RESPONSE, | ||||
Gregory Szorc
|
r37078 | b'text-output': FRAME_TYPE_TEXT_OUTPUT, | ||
Gregory Szorc
|
r37069 | } | ||
FLAG_COMMAND_NAME_EOS = 0x01 | ||||
FLAG_COMMAND_NAME_HAVE_ARGS = 0x02 | ||||
FLAG_COMMAND_NAME_HAVE_DATA = 0x04 | ||||
FLAGS_COMMAND = { | ||||
b'eos': FLAG_COMMAND_NAME_EOS, | ||||
b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS, | ||||
b'have-data': FLAG_COMMAND_NAME_HAVE_DATA, | ||||
} | ||||
FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01 | ||||
FLAG_COMMAND_ARGUMENT_EOA = 0x02 | ||||
FLAGS_COMMAND_ARGUMENT = { | ||||
b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION, | ||||
b'eoa': FLAG_COMMAND_ARGUMENT_EOA, | ||||
} | ||||
FLAG_COMMAND_DATA_CONTINUATION = 0x01 | ||||
FLAG_COMMAND_DATA_EOS = 0x02 | ||||
FLAGS_COMMAND_DATA = { | ||||
b'continuation': FLAG_COMMAND_DATA_CONTINUATION, | ||||
b'eos': FLAG_COMMAND_DATA_EOS, | ||||
} | ||||
Gregory Szorc
|
r37073 | FLAG_BYTES_RESPONSE_CONTINUATION = 0x01 | ||
FLAG_BYTES_RESPONSE_EOS = 0x02 | ||||
FLAGS_BYTES_RESPONSE = { | ||||
b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION, | ||||
b'eos': FLAG_BYTES_RESPONSE_EOS, | ||||
} | ||||
FLAG_ERROR_RESPONSE_PROTOCOL = 0x01 | ||||
FLAG_ERROR_RESPONSE_APPLICATION = 0x02 | ||||
FLAGS_ERROR_RESPONSE = { | ||||
b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL, | ||||
b'application': FLAG_ERROR_RESPONSE_APPLICATION, | ||||
} | ||||
Gregory Szorc
|
r37069 | # Maps frame types to their available flags. | ||
FRAME_TYPE_FLAGS = { | ||||
FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND, | ||||
FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT, | ||||
FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA, | ||||
Gregory Szorc
|
r37073 | FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE, | ||
FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, | ||||
Gregory Szorc
|
r37078 | FRAME_TYPE_TEXT_OUTPUT: {}, | ||
Gregory Szorc
|
r37069 | } | ||
ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') | ||||
Gregory Szorc
|
r37079 | @attr.s(slots=True) | ||
class frameheader(object): | ||||
"""Represents the data in a frame header.""" | ||||
length = attr.ib() | ||||
requestid = attr.ib() | ||||
typeid = attr.ib() | ||||
flags = attr.ib() | ||||
@attr.s(slots=True) | ||||
class frame(object): | ||||
"""Represents a parsed frame.""" | ||||
requestid = attr.ib() | ||||
typeid = attr.ib() | ||||
flags = attr.ib() | ||||
payload = attr.ib() | ||||
Gregory Szorc
|
r37080 | def makeframe(requestid, typeid, flags, payload): | ||
Gregory Szorc
|
r37069 | """Assemble a frame into a byte array.""" | ||
# TODO assert size of payload. | ||||
frame = bytearray(FRAME_HEADER_SIZE + len(payload)) | ||||
Gregory Szorc
|
r37075 | # 24 bits length | ||
# 16 bits request id | ||||
# 4 bits type | ||||
# 4 bits flags | ||||
Gregory Szorc
|
r37069 | l = struct.pack(r'<I', len(payload)) | ||
frame[0:3] = l[0:3] | ||||
Gregory Szorc
|
r37075 | struct.pack_into(r'<H', frame, 3, requestid) | ||
Gregory Szorc
|
r37080 | frame[5] = (typeid << 4) | flags | ||
Gregory Szorc
|
r37075 | frame[6:] = payload | ||
Gregory Szorc
|
r37069 | |||
return frame | ||||
def makeframefromhumanstring(s): | ||||
Gregory Szorc
|
r37075 | """Create a frame from a human readable string | ||
Strings have the form: | ||||
<request-id> <type> <flags> <payload> | ||||
Gregory Szorc
|
r37069 | |||
This can be used by user-facing applications and tests for creating | ||||
frames easily without having to type out a bunch of constants. | ||||
Gregory Szorc
|
r37075 | Request ID is an integer. | ||
Gregory Szorc
|
r37069 | Frame type and flags can be specified by integer or named constant. | ||
Gregory Szorc
|
r37075 | |||
Gregory Szorc
|
r37069 | Flags can be delimited by `|` to bitwise OR them together. | ||
""" | ||||
Gregory Szorc
|
r37075 | requestid, frametype, frameflags, payload = s.split(b' ', 3) | ||
requestid = int(requestid) | ||||
Gregory Szorc
|
r37069 | |||
if frametype in FRAME_TYPES: | ||||
frametype = FRAME_TYPES[frametype] | ||||
else: | ||||
frametype = int(frametype) | ||||
finalflags = 0 | ||||
validflags = FRAME_TYPE_FLAGS[frametype] | ||||
for flag in frameflags.split(b'|'): | ||||
if flag in validflags: | ||||
finalflags |= validflags[flag] | ||||
else: | ||||
finalflags |= int(flag) | ||||
Yuya Nishihara
|
r37102 | payload = stringutil.unescapestr(payload) | ||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37080 | return makeframe(requestid=requestid, typeid=frametype, | ||
flags=finalflags, payload=payload) | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37070 | def parseheader(data): | ||
"""Parse a unified framing protocol frame header from a buffer. | ||||
The header is expected to be in the buffer at offset 0 and the | ||||
buffer is expected to be large enough to hold a full header. | ||||
""" | ||||
# 24 bits payload length (little endian) | ||||
# 4 bits frame type | ||||
# 4 bits frame flags | ||||
# ... payload | ||||
framelength = data[0] + 256 * data[1] + 16384 * data[2] | ||||
Gregory Szorc
|
r37075 | requestid = struct.unpack_from(r'<H', data, 3)[0] | ||
typeflags = data[5] | ||||
Gregory Szorc
|
r37070 | |||
frametype = (typeflags & 0xf0) >> 4 | ||||
frameflags = typeflags & 0x0f | ||||
Gregory Szorc
|
r37079 | return frameheader(framelength, requestid, frametype, frameflags) | ||
Gregory Szorc
|
r37070 | |||
def readframe(fh): | ||||
"""Read a unified framing protocol frame from a file object. | ||||
Returns a 3-tuple of (type, flags, payload) for the decoded frame or | ||||
None if no frame is available. May raise if a malformed frame is | ||||
seen. | ||||
""" | ||||
header = bytearray(FRAME_HEADER_SIZE) | ||||
readcount = fh.readinto(header) | ||||
if readcount == 0: | ||||
return None | ||||
if readcount != FRAME_HEADER_SIZE: | ||||
raise error.Abort(_('received incomplete frame: got %d bytes: %s') % | ||||
(readcount, header)) | ||||
Gregory Szorc
|
r37079 | h = parseheader(header) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | payload = fh.read(h.length) | ||
if len(payload) != h.length: | ||||
Gregory Szorc
|
r37070 | raise error.Abort(_('frame length error: expected %d; got %d') % | ||
Gregory Szorc
|
r37079 | (h.length, len(payload))) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | return frame(h.requestid, h.typeid, h.flags, payload) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37075 | def createcommandframes(requestid, cmd, args, datafh=None): | ||
Gregory Szorc
|
r37069 | """Create frames necessary to transmit a request to run a command. | ||
This is a generator of bytearrays. Each item represents a frame | ||||
ready to be sent over the wire to a peer. | ||||
""" | ||||
flags = 0 | ||||
if args: | ||||
flags |= FLAG_COMMAND_NAME_HAVE_ARGS | ||||
if datafh: | ||||
flags |= FLAG_COMMAND_NAME_HAVE_DATA | ||||
if not flags: | ||||
flags |= FLAG_COMMAND_NAME_EOS | ||||
Gregory Szorc
|
r37080 | yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME, | ||
flags=flags, payload=cmd) | ||||
Gregory Szorc
|
r37069 | |||
for i, k in enumerate(sorted(args)): | ||||
v = args[k] | ||||
last = i == len(args) - 1 | ||||
# TODO handle splitting of argument values across frames. | ||||
payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v)) | ||||
offset = 0 | ||||
ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v)) | ||||
offset += ARGUMENT_FRAME_HEADER.size | ||||
payload[offset:offset + len(k)] = k | ||||
offset += len(k) | ||||
payload[offset:offset + len(v)] = v | ||||
flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0 | ||||
Gregory Szorc
|
r37080 | yield makeframe(requestid=requestid, | ||
typeid=FRAME_TYPE_COMMAND_ARGUMENT, | ||||
flags=flags, | ||||
payload=payload) | ||||
Gregory Szorc
|
r37069 | |||
if datafh: | ||||
while True: | ||||
data = datafh.read(DEFAULT_MAX_FRAME_SIZE) | ||||
done = False | ||||
if len(data) == DEFAULT_MAX_FRAME_SIZE: | ||||
flags = FLAG_COMMAND_DATA_CONTINUATION | ||||
else: | ||||
flags = FLAG_COMMAND_DATA_EOS | ||||
assert datafh.read(1) == b'' | ||||
done = True | ||||
Gregory Szorc
|
r37080 | yield makeframe(requestid=requestid, | ||
typeid=FRAME_TYPE_COMMAND_DATA, | ||||
flags=flags, | ||||
payload=data) | ||||
Gregory Szorc
|
r37069 | |||
if done: | ||||
break | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37075 | def createbytesresponseframesfrombytes(requestid, data, | ||
Gregory Szorc
|
r37073 | maxframesize=DEFAULT_MAX_FRAME_SIZE): | ||
"""Create a raw frame to send a bytes response from static bytes input. | ||||
Returns a generator of bytearrays. | ||||
""" | ||||
# Simple case of a single frame. | ||||
if len(data) <= maxframesize: | ||||
Gregory Szorc
|
r37080 | yield makeframe(requestid=requestid, | ||
typeid=FRAME_TYPE_BYTES_RESPONSE, | ||||
flags=FLAG_BYTES_RESPONSE_EOS, | ||||
payload=data) | ||||
Gregory Szorc
|
r37073 | return | ||
offset = 0 | ||||
while True: | ||||
chunk = data[offset:offset + maxframesize] | ||||
offset += len(chunk) | ||||
done = offset == len(data) | ||||
if done: | ||||
flags = FLAG_BYTES_RESPONSE_EOS | ||||
else: | ||||
flags = FLAG_BYTES_RESPONSE_CONTINUATION | ||||
Gregory Szorc
|
r37080 | yield makeframe(requestid=requestid, | ||
typeid=FRAME_TYPE_BYTES_RESPONSE, | ||||
flags=flags, | ||||
payload=chunk) | ||||
Gregory Szorc
|
r37073 | |||
if done: | ||||
break | ||||
Gregory Szorc
|
r37075 | def createerrorframe(requestid, msg, protocol=False, application=False): | ||
Gregory Szorc
|
r37073 | # TODO properly handle frame size limits. | ||
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE | ||||
flags = 0 | ||||
if protocol: | ||||
flags |= FLAG_ERROR_RESPONSE_PROTOCOL | ||||
if application: | ||||
flags |= FLAG_ERROR_RESPONSE_APPLICATION | ||||
Gregory Szorc
|
r37080 | yield makeframe(requestid=requestid, | ||
typeid=FRAME_TYPE_ERROR_RESPONSE, | ||||
flags=flags, | ||||
payload=msg) | ||||
Gregory Szorc
|
r37073 | |||
Gregory Szorc
|
r37078 | def createtextoutputframe(requestid, atoms): | ||
"""Create a text output frame to render text to people. | ||||
``atoms`` is a 3-tuple of (formatting string, args, labels). | ||||
The formatting string contains ``%s`` tokens to be replaced by the | ||||
corresponding indexed entry in ``args``. ``labels`` is an iterable of | ||||
formatters to be applied at rendering time. In terms of the ``ui`` | ||||
class, each atom corresponds to a ``ui.write()``. | ||||
""" | ||||
bytesleft = DEFAULT_MAX_FRAME_SIZE | ||||
atomchunks = [] | ||||
for (formatting, args, labels) in atoms: | ||||
if len(args) > 255: | ||||
raise ValueError('cannot use more than 255 formatting arguments') | ||||
if len(labels) > 255: | ||||
raise ValueError('cannot use more than 255 labels') | ||||
# TODO look for localstr, other types here? | ||||
if not isinstance(formatting, bytes): | ||||
raise ValueError('must use bytes formatting strings') | ||||
for arg in args: | ||||
if not isinstance(arg, bytes): | ||||
raise ValueError('must use bytes for arguments') | ||||
for label in labels: | ||||
if not isinstance(label, bytes): | ||||
raise ValueError('must use bytes for labels') | ||||
# Formatting string must be UTF-8. | ||||
formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8') | ||||
# Arguments must be UTF-8. | ||||
args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args] | ||||
# Labels must be ASCII. | ||||
labels = [l.decode(r'ascii', r'strict').encode(r'ascii') | ||||
for l in labels] | ||||
if len(formatting) > 65535: | ||||
raise ValueError('formatting string cannot be longer than 64k') | ||||
if any(len(a) > 65535 for a in args): | ||||
raise ValueError('argument string cannot be longer than 64k') | ||||
if any(len(l) > 255 for l in labels): | ||||
raise ValueError('label string cannot be longer than 255 bytes') | ||||
chunks = [ | ||||
struct.pack(r'<H', len(formatting)), | ||||
struct.pack(r'<BB', len(labels), len(args)), | ||||
struct.pack(r'<' + r'B' * len(labels), *map(len, labels)), | ||||
struct.pack(r'<' + r'H' * len(args), *map(len, args)), | ||||
] | ||||
chunks.append(formatting) | ||||
chunks.extend(labels) | ||||
chunks.extend(args) | ||||
atom = b''.join(chunks) | ||||
atomchunks.append(atom) | ||||
bytesleft -= len(atom) | ||||
if bytesleft < 0: | ||||
raise ValueError('cannot encode data in a single frame') | ||||
Gregory Szorc
|
r37080 | yield makeframe(requestid=requestid, | ||
typeid=FRAME_TYPE_TEXT_OUTPUT, | ||||
flags=0, | ||||
payload=b''.join(atomchunks)) | ||||
Gregory Szorc
|
r37078 | |||
Gregory Szorc
|
r37070 | class serverreactor(object): | ||
"""Holds state of a server handling frame-based protocol requests. | ||||
This class is the "brain" of the unified frame-based protocol server | ||||
component. While the protocol is stateless from the perspective of | ||||
requests/commands, something needs to track which frames have been | ||||
received, what frames to expect, etc. This class is that thing. | ||||
Instances are modeled as a state machine of sorts. Instances are also | ||||
reactionary to external events. The point of this class is to encapsulate | ||||
the state of the connection and the exchange of frames, not to perform | ||||
work. Instead, callers tell this class when something occurs, like a | ||||
frame arriving. If that activity is worthy of a follow-up action (say | ||||
*run a command*), the return value of that handler will say so. | ||||
I/O and CPU intensive operations are purposefully delegated outside of | ||||
this class. | ||||
Consumers are expected to tell instances when events occur. They do so by | ||||
calling the various ``on*`` methods. These methods return a 2-tuple | ||||
describing any follow-up action(s) to take. The first element is the | ||||
name of an action to perform. The second is a data structure (usually | ||||
a dict) specific to that action that contains more information. e.g. | ||||
if the server wants to send frames back to the client, the data structure | ||||
will contain a reference to those frames. | ||||
Valid actions that consumers can be instructed to take are: | ||||
Gregory Szorc
|
r37073 | sendframes | ||
Indicates that frames should be sent to the client. The ``framegen`` | ||||
key contains a generator of frames that should be sent. The server | ||||
assumes that all frames are sent to the client. | ||||
Gregory Szorc
|
r37070 | error | ||
Indicates that an error occurred. Consumer should probably abort. | ||||
runcommand | ||||
Indicates that the consumer should run a wire protocol command. Details | ||||
of the command to run are given in the data structure. | ||||
wantframe | ||||
Indicates that nothing of interest happened and the server is waiting on | ||||
more frames from the client before anything interesting can be done. | ||||
Gregory Szorc
|
r37074 | |||
noop | ||||
Indicates no additional action is required. | ||||
Gregory Szorc
|
r37076 | |||
Known Issues | ||||
------------ | ||||
There are no limits to the number of partially received commands or their | ||||
size. A malicious client could stream command request data and exhaust the | ||||
server's memory. | ||||
Partially received commands are not acted upon when end of input is | ||||
reached. Should the server error if it receives a partial request? | ||||
Should the client send a message to abort a partially transmitted request | ||||
to facilitate graceful shutdown? | ||||
Active requests that haven't been responded to aren't tracked. This means | ||||
that if we receive a command and instruct its dispatch, another command | ||||
with its request ID can come in over the wire and there will be a race | ||||
between who responds to what. | ||||
Gregory Szorc
|
r37070 | """ | ||
Gregory Szorc
|
r37074 | def __init__(self, deferoutput=False): | ||
"""Construct a new server reactor. | ||||
``deferoutput`` can be used to indicate that no output frames should be | ||||
instructed to be sent until input has been exhausted. In this mode, | ||||
events that would normally generate output frames (such as a command | ||||
response being ready) will instead defer instructing the consumer to | ||||
send those frames. This is useful for half-duplex transports where the | ||||
sender cannot receive until all data has been transmitted. | ||||
""" | ||||
self._deferoutput = deferoutput | ||||
Gregory Szorc
|
r37070 | self._state = 'idle' | ||
Gregory Szorc
|
r37074 | self._bufferedframegens = [] | ||
Gregory Szorc
|
r37076 | # request id -> dict of commands that are actively being received. | ||
self._receivingcommands = {} | ||||
Gregory Szorc
|
r37081 | # Request IDs that have been received and are actively being processed. | ||
# Once all output for a request has been sent, it is removed from this | ||||
# set. | ||||
self._activecommands = set() | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | def onframerecv(self, frame): | ||
Gregory Szorc
|
r37070 | """Process a frame that has been received off the wire. | ||
Returns a dict with an ``action`` key that details what action, | ||||
if any, the consumer should take next. | ||||
""" | ||||
handlers = { | ||||
'idle': self._onframeidle, | ||||
Gregory Szorc
|
r37076 | 'command-receiving': self._onframecommandreceiving, | ||
Gregory Szorc
|
r37070 | 'errored': self._onframeerrored, | ||
} | ||||
meth = handlers.get(self._state) | ||||
if not meth: | ||||
raise error.ProgrammingError('unhandled state: %s' % self._state) | ||||
Gregory Szorc
|
r37079 | return meth(frame) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37075 | def onbytesresponseready(self, requestid, data): | ||
Gregory Szorc
|
r37073 | """Signal that a bytes response is ready to be sent to the client. | ||
The raw bytes response is passed as an argument. | ||||
""" | ||||
Gregory Szorc
|
r37081 | def sendframes(): | ||
for frame in createbytesresponseframesfrombytes(requestid, data): | ||||
yield frame | ||||
self._activecommands.remove(requestid) | ||||
result = sendframes() | ||||
Gregory Szorc
|
r37074 | |||
if self._deferoutput: | ||||
Gregory Szorc
|
r37081 | self._bufferedframegens.append(result) | ||
Gregory Szorc
|
r37074 | return 'noop', {} | ||
else: | ||||
return 'sendframes', { | ||||
Gregory Szorc
|
r37081 | 'framegen': result, | ||
Gregory Szorc
|
r37074 | } | ||
def oninputeof(self): | ||||
"""Signals that end of input has been received. | ||||
No more frames will be received. All pending activity should be | ||||
completed. | ||||
""" | ||||
Gregory Szorc
|
r37076 | # TODO should we do anything about in-flight commands? | ||
Gregory Szorc
|
r37074 | if not self._deferoutput or not self._bufferedframegens: | ||
return 'noop', {} | ||||
# If we buffered all our responses, emit those. | ||||
def makegen(): | ||||
for gen in self._bufferedframegens: | ||||
for frame in gen: | ||||
yield frame | ||||
Gregory Szorc
|
r37073 | return 'sendframes', { | ||
Gregory Szorc
|
r37074 | 'framegen': makegen(), | ||
Gregory Szorc
|
r37073 | } | ||
Gregory Szorc
|
r37075 | def onapplicationerror(self, requestid, msg): | ||
Gregory Szorc
|
r37073 | return 'sendframes', { | ||
Gregory Szorc
|
r37075 | 'framegen': createerrorframe(requestid, msg, application=True), | ||
Gregory Szorc
|
r37073 | } | ||
Gregory Szorc
|
r37070 | def _makeerrorresult(self, msg): | ||
return 'error', { | ||||
'message': msg, | ||||
} | ||||
Gregory Szorc
|
r37076 | def _makeruncommandresult(self, requestid): | ||
entry = self._receivingcommands[requestid] | ||||
del self._receivingcommands[requestid] | ||||
if self._receivingcommands: | ||||
self._state = 'command-receiving' | ||||
else: | ||||
self._state = 'idle' | ||||
Gregory Szorc
|
r37081 | assert requestid not in self._activecommands | ||
self._activecommands.add(requestid) | ||||
Gregory Szorc
|
r37070 | return 'runcommand', { | ||
Gregory Szorc
|
r37076 | 'requestid': requestid, | ||
'command': entry['command'], | ||||
'args': entry['args'], | ||||
'data': entry['data'].getvalue() if entry['data'] else None, | ||||
Gregory Szorc
|
r37070 | } | ||
def _makewantframeresult(self): | ||||
return 'wantframe', { | ||||
'state': self._state, | ||||
} | ||||
Gregory Szorc
|
r37079 | def _onframeidle(self, frame): | ||
Gregory Szorc
|
r37070 | # The only frame type that should be received in this state is a | ||
# command request. | ||||
Gregory Szorc
|
r37079 | if frame.typeid != FRAME_TYPE_COMMAND_NAME: | ||
Gregory Szorc
|
r37070 | self._state = 'errored' | ||
return self._makeerrorresult( | ||||
Gregory Szorc
|
r37079 | _('expected command frame; got %d') % frame.typeid) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | if frame.requestid in self._receivingcommands: | ||
Gregory Szorc
|
r37076 | self._state = 'errored' | ||
return self._makeerrorresult( | ||||
Gregory Szorc
|
r37079 | _('request with ID %d already received') % frame.requestid) | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37081 | if frame.requestid in self._activecommands: | ||
self._state = 'errored' | ||||
return self._makeerrorresult(( | ||||
_('request with ID %d is already active') % frame.requestid)) | ||||
Gregory Szorc
|
r37079 | expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS) | ||
expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA) | ||||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | self._receivingcommands[frame.requestid] = { | ||
'command': frame.payload, | ||||
Gregory Szorc
|
r37076 | 'args': {}, | ||
'data': None, | ||||
'expectingargs': expectingargs, | ||||
'expectingdata': expectingdata, | ||||
} | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | if frame.flags & FLAG_COMMAND_NAME_EOS: | ||
return self._makeruncommandresult(frame.requestid) | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37076 | if expectingargs or expectingdata: | ||
self._state = 'command-receiving' | ||||
Gregory Szorc
|
r37070 | return self._makewantframeresult() | ||
else: | ||||
self._state = 'errored' | ||||
return self._makeerrorresult(_('missing frame flags on ' | ||||
'command frame')) | ||||
Gregory Szorc
|
r37079 | def _onframecommandreceiving(self, frame): | ||
Gregory Szorc
|
r37076 | # It could be a new command request. Process it as such. | ||
Gregory Szorc
|
r37079 | if frame.typeid == FRAME_TYPE_COMMAND_NAME: | ||
return self._onframeidle(frame) | ||||
Gregory Szorc
|
r37076 | |||
# All other frames should be related to a command that is currently | ||||
Gregory Szorc
|
r37081 | # receiving but is not active. | ||
if frame.requestid in self._activecommands: | ||||
self._state = 'errored' | ||||
return self._makeerrorresult( | ||||
_('received frame for request that is still active: %d') % | ||||
frame.requestid) | ||||
Gregory Szorc
|
r37079 | if frame.requestid not in self._receivingcommands: | ||
Gregory Szorc
|
r37070 | self._state = 'errored' | ||
Gregory Szorc
|
r37076 | return self._makeerrorresult( | ||
_('received frame for request that is not receiving: %d') % | ||||
Gregory Szorc
|
r37079 | frame.requestid) | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | entry = self._receivingcommands[frame.requestid] | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT: | ||
Gregory Szorc
|
r37076 | if not entry['expectingargs']: | ||
self._state = 'errored' | ||||
return self._makeerrorresult(_( | ||||
'received command argument frame for request that is not ' | ||||
Gregory Szorc
|
r37079 | 'expecting arguments: %d') % frame.requestid) | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | return self._handlecommandargsframe(frame, entry) | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | elif frame.typeid == FRAME_TYPE_COMMAND_DATA: | ||
Gregory Szorc
|
r37076 | if not entry['expectingdata']: | ||
self._state = 'errored' | ||||
return self._makeerrorresult(_( | ||||
'received command data frame for request that is not ' | ||||
Gregory Szorc
|
r37079 | 'expecting data: %d') % frame.requestid) | ||
Gregory Szorc
|
r37076 | |||
if entry['data'] is None: | ||||
entry['data'] = util.bytesio() | ||||
Gregory Szorc
|
r37079 | return self._handlecommanddataframe(frame, entry) | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | def _handlecommandargsframe(self, frame, entry): | ||
Gregory Szorc
|
r37076 | # The frame and state of command should have already been validated. | ||
Gregory Szorc
|
r37079 | assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT | ||
Gregory Szorc
|
r37070 | |||
offset = 0 | ||||
Gregory Szorc
|
r37079 | namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload) | ||
Gregory Szorc
|
r37070 | offset += ARGUMENT_FRAME_HEADER.size | ||
# The argument name MUST fit inside the frame. | ||||
Gregory Szorc
|
r37079 | argname = bytes(frame.payload[offset:offset + namesize]) | ||
Gregory Szorc
|
r37070 | offset += namesize | ||
if len(argname) != namesize: | ||||
self._state = 'errored' | ||||
return self._makeerrorresult(_('malformed argument frame: ' | ||||
'partial argument name')) | ||||
Gregory Szorc
|
r37079 | argvalue = bytes(frame.payload[offset:]) | ||
Gregory Szorc
|
r37070 | |||
# Argument value spans multiple frames. Record our active state | ||||
# and wait for the next frame. | ||||
Gregory Szorc
|
r37079 | if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION: | ||
Gregory Szorc
|
r37070 | raise error.ProgrammingError('not yet implemented') | ||
# Common case: the argument value is completely contained in this | ||||
# frame. | ||||
if len(argvalue) != valuesize: | ||||
self._state = 'errored' | ||||
return self._makeerrorresult(_('malformed argument frame: ' | ||||
'partial argument value')) | ||||
Gregory Szorc
|
r37076 | entry['args'][argname] = argvalue | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | if frame.flags & FLAG_COMMAND_ARGUMENT_EOA: | ||
Gregory Szorc
|
r37076 | if entry['expectingdata']: | ||
Gregory Szorc
|
r37070 | # TODO signal request to run a command once we don't | ||
# buffer data frames. | ||||
return self._makewantframeresult() | ||||
else: | ||||
Gregory Szorc
|
r37079 | return self._makeruncommandresult(frame.requestid) | ||
Gregory Szorc
|
r37070 | else: | ||
return self._makewantframeresult() | ||||
Gregory Szorc
|
r37079 | def _handlecommanddataframe(self, frame, entry): | ||
assert frame.typeid == FRAME_TYPE_COMMAND_DATA | ||||
Gregory Szorc
|
r37070 | |||
# TODO support streaming data instead of buffering it. | ||||
Gregory Szorc
|
r37079 | entry['data'].write(frame.payload) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: | ||
Gregory Szorc
|
r37070 | return self._makewantframeresult() | ||
Gregory Szorc
|
r37079 | elif frame.flags & FLAG_COMMAND_DATA_EOS: | ||
Gregory Szorc
|
r37076 | entry['data'].seek(0) | ||
Gregory Szorc
|
r37079 | return self._makeruncommandresult(frame.requestid) | ||
Gregory Szorc
|
r37070 | else: | ||
self._state = 'errored' | ||||
return self._makeerrorresult(_('command data frame without ' | ||||
'flags')) | ||||
Gregory Szorc
|
r37079 | def _onframeerrored(self, frame): | ||
Gregory Szorc
|
r37070 | return self._makeerrorresult(_('server already errored')) | ||