wireprotoframing.py
2086 lines
| 64.6 KiB
| text/x-python
|
PythonLexer
/ mercurial / wireprotoframing.py
Gregory Szorc
|
r37069 | # wireprotoframing.py - unified framing protocol for wire protocol | ||
# | ||||
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
# This file contains functionality to support the unified frame-based wire | ||||
# protocol. For details about the protocol, see | ||||
# `hg help internals.wireprotocol`. | ||||
Gregory Szorc
|
r37561 | import collections | ||
Gregory Szorc
|
r37069 | import struct | ||
Gregory Szorc
|
r37070 | from .i18n import _ | ||
Augie Fackler
|
r43346 | from .thirdparty import attr | ||
Gregory Szorc
|
r37069 | from . import ( | ||
Yuya Nishihara
|
r37492 | encoding, | ||
Gregory Szorc
|
r37070 | error, | ||
Gregory Szorc
|
r40061 | pycompat, | ||
Gregory Szorc
|
r37069 | util, | ||
Gregory Szorc
|
r40056 | wireprototypes, | ||
Gregory Szorc
|
r37069 | ) | ||
Yuya Nishihara
|
r37102 | from .utils import ( | ||
Gregory Szorc
|
r39477 | cborutil, | ||
Yuya Nishihara
|
r37102 | stringutil, | ||
) | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37304 | FRAME_HEADER_SIZE = 8 | ||
Gregory Szorc
|
r37069 | DEFAULT_MAX_FRAME_SIZE = 32768 | ||
Gregory Szorc
|
r37304 | STREAM_FLAG_BEGIN_STREAM = 0x01 | ||
STREAM_FLAG_END_STREAM = 0x02 | ||||
STREAM_FLAG_ENCODING_APPLIED = 0x04 | ||||
STREAM_FLAGS = { | ||||
b'stream-begin': STREAM_FLAG_BEGIN_STREAM, | ||||
b'stream-end': STREAM_FLAG_END_STREAM, | ||||
b'encoded': STREAM_FLAG_ENCODING_APPLIED, | ||||
} | ||||
Gregory Szorc
|
r37308 | FRAME_TYPE_COMMAND_REQUEST = 0x01 | ||
Gregory Szorc
|
r37741 | FRAME_TYPE_COMMAND_DATA = 0x02 | ||
Gregory Szorc
|
r37742 | FRAME_TYPE_COMMAND_RESPONSE = 0x03 | ||
Gregory Szorc
|
r37073 | FRAME_TYPE_ERROR_RESPONSE = 0x05 | ||
Gregory Szorc
|
r37078 | FRAME_TYPE_TEXT_OUTPUT = 0x06 | ||
Gregory Szorc
|
r37307 | FRAME_TYPE_PROGRESS = 0x07 | ||
Gregory Szorc
|
r40161 | FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08 | ||
FRAME_TYPE_STREAM_SETTINGS = 0x09 | ||||
Gregory Szorc
|
r37069 | |||
FRAME_TYPES = { | ||||
Gregory Szorc
|
r37308 | b'command-request': FRAME_TYPE_COMMAND_REQUEST, | ||
Gregory Szorc
|
r37069 | b'command-data': FRAME_TYPE_COMMAND_DATA, | ||
Gregory Szorc
|
r37742 | b'command-response': FRAME_TYPE_COMMAND_RESPONSE, | ||
Gregory Szorc
|
r37073 | b'error-response': FRAME_TYPE_ERROR_RESPONSE, | ||
Gregory Szorc
|
r37078 | b'text-output': FRAME_TYPE_TEXT_OUTPUT, | ||
Gregory Szorc
|
r37307 | b'progress': FRAME_TYPE_PROGRESS, | ||
Gregory Szorc
|
r40161 | b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS, | ||
Gregory Szorc
|
r37304 | b'stream-settings': FRAME_TYPE_STREAM_SETTINGS, | ||
Gregory Szorc
|
r37069 | } | ||
Gregory Szorc
|
r37308 | FLAG_COMMAND_REQUEST_NEW = 0x01 | ||
FLAG_COMMAND_REQUEST_CONTINUATION = 0x02 | ||||
FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04 | ||||
FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08 | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37308 | FLAGS_COMMAND_REQUEST = { | ||
b'new': FLAG_COMMAND_REQUEST_NEW, | ||||
b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION, | ||||
b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES, | ||||
b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA, | ||||
Gregory Szorc
|
r37069 | } | ||
FLAG_COMMAND_DATA_CONTINUATION = 0x01 | ||||
FLAG_COMMAND_DATA_EOS = 0x02 | ||||
FLAGS_COMMAND_DATA = { | ||||
b'continuation': FLAG_COMMAND_DATA_CONTINUATION, | ||||
b'eos': FLAG_COMMAND_DATA_EOS, | ||||
} | ||||
Gregory Szorc
|
r37742 | FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01 | ||
FLAG_COMMAND_RESPONSE_EOS = 0x02 | ||||
Gregory Szorc
|
r37073 | |||
Gregory Szorc
|
r37742 | FLAGS_COMMAND_RESPONSE = { | ||
b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION, | ||||
b'eos': FLAG_COMMAND_RESPONSE_EOS, | ||||
Gregory Szorc
|
r37073 | } | ||
Gregory Szorc
|
r40161 | FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01 | ||
FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02 | ||||
FLAGS_SENDER_PROTOCOL_SETTINGS = { | ||||
b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION, | ||||
b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS, | ||||
} | ||||
FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01 | ||||
FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02 | ||||
FLAGS_STREAM_ENCODING_SETTINGS = { | ||||
b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION, | ||||
b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS, | ||||
} | ||||
Gregory Szorc
|
r37069 | # Maps frame types to their available flags. | ||
FRAME_TYPE_FLAGS = { | ||||
Gregory Szorc
|
r37308 | FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST, | ||
Gregory Szorc
|
r37069 | FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA, | ||
Gregory Szorc
|
r37742 | FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE, | ||
Gregory Szorc
|
r37744 | FRAME_TYPE_ERROR_RESPONSE: {}, | ||
Gregory Szorc
|
r37078 | FRAME_TYPE_TEXT_OUTPUT: {}, | ||
Gregory Szorc
|
r37307 | FRAME_TYPE_PROGRESS: {}, | ||
Gregory Szorc
|
r40161 | FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS, | ||
FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS, | ||||
Gregory Szorc
|
r37069 | } | ||
Augie Fackler
|
r43906 | ARGUMENT_RECORD_HEADER = struct.Struct('<HH') | ||
Gregory Szorc
|
r37069 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37314 | def humanflags(mapping, value): | ||
"""Convert a numeric flags value to a human value, using a mapping table.""" | ||||
Gregory Szorc
|
r49768 | namemap = {v: k for k, v in mapping.items()} | ||
Gregory Szorc
|
r37314 | flags = [] | ||
Yuya Nishihara
|
r37491 | val = 1 | ||
while value >= val: | ||||
Gregory Szorc
|
r37314 | if value & val: | ||
Augie Fackler
|
r43347 | flags.append(namemap.get(val, b'<unknown 0x%02x>' % val)) | ||
Yuya Nishihara
|
r37491 | val <<= 1 | ||
Gregory Szorc
|
r37314 | |||
return b'|'.join(flags) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37079 | @attr.s(slots=True) | ||
Gregory Szorc
|
r49801 | class frameheader: | ||
Gregory Szorc
|
r37079 | """Represents the data in a frame header.""" | ||
length = attr.ib() | ||||
requestid = attr.ib() | ||||
Gregory Szorc
|
r37304 | streamid = attr.ib() | ||
streamflags = attr.ib() | ||||
Gregory Szorc
|
r37079 | typeid = attr.ib() | ||
flags = attr.ib() | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37314 | @attr.s(slots=True, repr=False) | ||
Gregory Szorc
|
r49801 | class frame: | ||
Gregory Szorc
|
r37079 | """Represents a parsed frame.""" | ||
requestid = attr.ib() | ||||
Gregory Szorc
|
r37304 | streamid = attr.ib() | ||
streamflags = attr.ib() | ||||
Gregory Szorc
|
r37079 | typeid = attr.ib() | ||
flags = attr.ib() | ||||
payload = attr.ib() | ||||
Yuya Nishihara
|
r37492 | @encoding.strmethod | ||
Gregory Szorc
|
r37314 | def __repr__(self): | ||
Augie Fackler
|
r43347 | typename = b'<unknown 0x%02x>' % self.typeid | ||
Gregory Szorc
|
r49768 | for name, value in FRAME_TYPES.items(): | ||
Gregory Szorc
|
r37314 | if value == self.typeid: | ||
typename = name | ||||
break | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'frame(size=%d; request=%d; stream=%d; streamflags=%s; ' | ||
b'type=%s; flags=%s)' | ||||
Augie Fackler
|
r43346 | % ( | ||
len(self.payload), | ||||
self.requestid, | ||||
self.streamid, | ||||
humanflags(STREAM_FLAGS, self.streamflags), | ||||
typename, | ||||
humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags), | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37314 | |||
Gregory Szorc
|
r37304 | def makeframe(requestid, streamid, streamflags, typeid, flags, payload): | ||
Gregory Szorc
|
r37069 | """Assemble a frame into a byte array.""" | ||
# TODO assert size of payload. | ||||
frame = bytearray(FRAME_HEADER_SIZE + len(payload)) | ||||
Gregory Szorc
|
r37075 | # 24 bits length | ||
# 16 bits request id | ||||
Gregory Szorc
|
r37304 | # 8 bits stream id | ||
# 8 bits stream flags | ||||
Gregory Szorc
|
r37075 | # 4 bits type | ||
# 4 bits flags | ||||
Augie Fackler
|
r43906 | l = struct.pack('<I', len(payload)) | ||
Gregory Szorc
|
r37069 | frame[0:3] = l[0:3] | ||
Augie Fackler
|
r43906 | struct.pack_into('<HBB', frame, 3, requestid, streamid, streamflags) | ||
Gregory Szorc
|
r37304 | frame[7] = (typeid << 4) | flags | ||
frame[8:] = payload | ||||
Gregory Szorc
|
r37069 | |||
return frame | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37069 | def makeframefromhumanstring(s): | ||
Gregory Szorc
|
r37075 | """Create a frame from a human readable string | ||
Strings have the form: | ||||
Gregory Szorc
|
r37304 | <request-id> <stream-id> <stream-flags> <type> <flags> <payload> | ||
Gregory Szorc
|
r37069 | |||
This can be used by user-facing applications and tests for creating | ||||
frames easily without having to type out a bunch of constants. | ||||
Gregory Szorc
|
r37304 | Request ID and stream IDs are integers. | ||
Gregory Szorc
|
r37075 | |||
Gregory Szorc
|
r37304 | Stream flags, frame type, and flags can be specified by integer or | ||
named constant. | ||||
Gregory Szorc
|
r37075 | |||
Gregory Szorc
|
r37069 | Flags can be delimited by `|` to bitwise OR them together. | ||
Gregory Szorc
|
r37306 | |||
If the payload begins with ``cbor:``, the following string will be | ||||
Yuya Nishihara
|
r37494 | evaluated as Python literal and the resulting object will be fed into | ||
Gregory Szorc
|
r37306 | a CBOR encoder. Otherwise, the payload is interpreted as a Python | ||
byte string literal. | ||||
Gregory Szorc
|
r37069 | """ | ||
Gregory Szorc
|
r37304 | fields = s.split(b' ', 5) | ||
requestid, streamid, streamflags, frametype, frameflags, payload = fields | ||||
Gregory Szorc
|
r37075 | |||
requestid = int(requestid) | ||||
Gregory Szorc
|
r37304 | streamid = int(streamid) | ||
finalstreamflags = 0 | ||||
for flag in streamflags.split(b'|'): | ||||
if flag in STREAM_FLAGS: | ||||
finalstreamflags |= STREAM_FLAGS[flag] | ||||
else: | ||||
finalstreamflags |= int(flag) | ||||
Gregory Szorc
|
r37069 | |||
if frametype in FRAME_TYPES: | ||||
frametype = FRAME_TYPES[frametype] | ||||
else: | ||||
frametype = int(frametype) | ||||
finalflags = 0 | ||||
validflags = FRAME_TYPE_FLAGS[frametype] | ||||
for flag in frameflags.split(b'|'): | ||||
if flag in validflags: | ||||
finalflags |= validflags[flag] | ||||
else: | ||||
finalflags |= int(flag) | ||||
Gregory Szorc
|
r37306 | if payload.startswith(b'cbor:'): | ||
Augie Fackler
|
r43346 | payload = b''.join( | ||
cborutil.streamencode(stringutil.evalpythonliteral(payload[5:])) | ||||
) | ||||
Gregory Szorc
|
r37306 | |||
else: | ||||
payload = stringutil.unescapestr(payload) | ||||
Gregory Szorc
|
r37069 | |||
Augie Fackler
|
r43346 | return makeframe( | ||
requestid=requestid, | ||||
streamid=streamid, | ||||
streamflags=finalstreamflags, | ||||
typeid=frametype, | ||||
flags=finalflags, | ||||
payload=payload, | ||||
) | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37070 | def parseheader(data): | ||
"""Parse a unified framing protocol frame header from a buffer. | ||||
The header is expected to be in the buffer at offset 0 and the | ||||
buffer is expected to be large enough to hold a full header. | ||||
""" | ||||
# 24 bits payload length (little endian) | ||||
Gregory Szorc
|
r37304 | # 16 bits request ID | ||
# 8 bits stream ID | ||||
# 8 bits stream flags | ||||
Gregory Szorc
|
r37070 | # 4 bits frame type | ||
# 4 bits frame flags | ||||
# ... payload | ||||
framelength = data[0] + 256 * data[1] + 16384 * data[2] | ||||
Augie Fackler
|
r43906 | requestid, streamid, streamflags = struct.unpack_from('<HBB', data, 3) | ||
Gregory Szorc
|
r37304 | typeflags = data[7] | ||
Gregory Szorc
|
r37070 | |||
Augie Fackler
|
r43346 | frametype = (typeflags & 0xF0) >> 4 | ||
frameflags = typeflags & 0x0F | ||||
Gregory Szorc
|
r37070 | |||
Augie Fackler
|
r43346 | return frameheader( | ||
framelength, requestid, streamid, streamflags, frametype, frameflags | ||||
) | ||||
Gregory Szorc
|
r37070 | |||
def readframe(fh): | ||||
"""Read a unified framing protocol frame from a file object. | ||||
Returns a 3-tuple of (type, flags, payload) for the decoded frame or | ||||
None if no frame is available. May raise if a malformed frame is | ||||
seen. | ||||
""" | ||||
header = bytearray(FRAME_HEADER_SIZE) | ||||
readcount = fh.readinto(header) | ||||
if readcount == 0: | ||||
return None | ||||
if readcount != FRAME_HEADER_SIZE: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b'received incomplete frame: got %d bytes: %s') | ||
Augie Fackler
|
r43346 | % (readcount, header) | ||
) | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | h = parseheader(header) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | payload = fh.read(h.length) | ||
if len(payload) != h.length: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b'frame length error: expected %d; got %d') | ||
Augie Fackler
|
r43346 | % (h.length, len(payload)) | ||
) | ||||
return frame( | ||||
h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload | ||||
) | ||||
Gregory Szorc
|
r37070 | |||
Augie Fackler
|
r43346 | def createcommandframes( | ||
stream, | ||||
requestid, | ||||
cmd, | ||||
args, | ||||
datafh=None, | ||||
maxframesize=DEFAULT_MAX_FRAME_SIZE, | ||||
redirect=None, | ||||
): | ||||
Gregory Szorc
|
r37069 | """Create frames necessary to transmit a request to run a command. | ||
This is a generator of bytearrays. Each item represents a frame | ||||
ready to be sent over the wire to a peer. | ||||
""" | ||||
Gregory Szorc
|
r37308 | data = {b'name': cmd} | ||
Gregory Szorc
|
r37069 | if args: | ||
Gregory Szorc
|
r37308 | data[b'args'] = args | ||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r40060 | if redirect: | ||
data[b'redirect'] = redirect | ||||
Gregory Szorc
|
r39477 | data = b''.join(cborutil.streamencode(data)) | ||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37308 | offset = 0 | ||
while True: | ||||
flags = 0 | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37308 | # Must set new or continuation flag. | ||
if not offset: | ||||
flags |= FLAG_COMMAND_REQUEST_NEW | ||||
else: | ||||
flags |= FLAG_COMMAND_REQUEST_CONTINUATION | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37308 | # Data frames is set on all frames. | ||
if datafh: | ||||
flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA | ||||
Gregory Szorc
|
r37069 | |||
Augie Fackler
|
r43346 | payload = data[offset : offset + maxframesize] | ||
Gregory Szorc
|
r37308 | offset += len(payload) | ||
if len(payload) == maxframesize and offset < len(data): | ||||
flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES | ||||
Augie Fackler
|
r43346 | yield stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_REQUEST, | ||||
flags=flags, | ||||
payload=payload, | ||||
) | ||||
Gregory Szorc
|
r37069 | |||
Gregory Szorc
|
r37308 | if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES): | ||
break | ||||
Gregory Szorc
|
r37069 | if datafh: | ||
while True: | ||||
data = datafh.read(DEFAULT_MAX_FRAME_SIZE) | ||||
done = False | ||||
if len(data) == DEFAULT_MAX_FRAME_SIZE: | ||||
flags = FLAG_COMMAND_DATA_CONTINUATION | ||||
else: | ||||
flags = FLAG_COMMAND_DATA_EOS | ||||
assert datafh.read(1) == b'' | ||||
done = True | ||||
Augie Fackler
|
r43346 | yield stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_DATA, | ||||
flags=flags, | ||||
payload=data, | ||||
) | ||||
Gregory Szorc
|
r37069 | |||
if done: | ||||
break | ||||
Gregory Szorc
|
r37070 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r39595 | def createcommandresponseokframe(stream, requestid): | ||
overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | ||||
Gregory Szorc
|
r40173 | if stream.streamsettingssent: | ||
overall = stream.encode(overall) | ||||
encoded = True | ||||
if not overall: | ||||
return None | ||||
else: | ||||
encoded = False | ||||
Augie Fackler
|
r43346 | return stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | ||||
flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | ||||
payload=overall, | ||||
encoded=encoded, | ||||
) | ||||
Gregory Szorc
|
r39595 | |||
Augie Fackler
|
r43346 | |||
def createcommandresponseeosframes( | ||||
stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE | ||||
): | ||||
Gregory Szorc
|
r39595 | """Create an empty payload frame representing command end-of-stream.""" | ||
Gregory Szorc
|
r40173 | payload = stream.flush() | ||
offset = 0 | ||||
while True: | ||||
Augie Fackler
|
r43346 | chunk = payload[offset : offset + maxframesize] | ||
Gregory Szorc
|
r40173 | offset += len(chunk) | ||
done = offset == len(payload) | ||||
if done: | ||||
flags = FLAG_COMMAND_RESPONSE_EOS | ||||
else: | ||||
flags = FLAG_COMMAND_RESPONSE_CONTINUATION | ||||
Augie Fackler
|
r43346 | yield stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | ||||
flags=flags, | ||||
payload=chunk, | ||||
encoded=payload != b'', | ||||
) | ||||
Gregory Szorc
|
r40173 | |||
if done: | ||||
break | ||||
Gregory Szorc
|
r37746 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40061 | def createalternatelocationresponseframe(stream, requestid, location): | ||
data = { | ||||
b'status': b'redirect', | ||||
Augie Fackler
|
r46554 | b'location': { | ||
b'url': location.url, | ||||
b'mediatype': location.mediatype, | ||||
}, | ||||
Gregory Szorc
|
r40061 | } | ||
Augie Fackler
|
r43346 | for a in ( | ||
Augie Fackler
|
r43906 | 'size', | ||
'fullhashes', | ||||
'fullhashseed', | ||||
'serverdercerts', | ||||
'servercadercerts', | ||||
Augie Fackler
|
r43346 | ): | ||
Gregory Szorc
|
r40061 | value = getattr(location, a) | ||
if value is not None: | ||||
data[b'location'][pycompat.bytestr(a)] = value | ||||
Gregory Szorc
|
r40173 | payload = b''.join(cborutil.streamencode(data)) | ||
if stream.streamsettingssent: | ||||
payload = stream.encode(payload) | ||||
encoded = True | ||||
else: | ||||
encoded = False | ||||
Augie Fackler
|
r43346 | return stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | ||||
flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | ||||
payload=payload, | ||||
encoded=encoded, | ||||
) | ||||
Gregory Szorc
|
r40061 | |||
Gregory Szorc
|
r37746 | def createcommanderrorresponse(stream, requestid, message, args=None): | ||
Gregory Szorc
|
r39522 | # TODO should this be using a list of {'msg': ..., 'args': {}} so atom | ||
# formatting works consistently? | ||||
Augie Fackler
|
r46554 | m = { | ||
b'status': b'error', | ||||
b'error': { | ||||
b'message': message, | ||||
}, | ||||
} | ||||
Gregory Szorc
|
r37746 | |||
if args: | ||||
m[b'error'][b'args'] = args | ||||
Gregory Szorc
|
r39477 | overall = b''.join(cborutil.streamencode(m)) | ||
Gregory Szorc
|
r37746 | |||
Augie Fackler
|
r43346 | yield stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | ||||
flags=FLAG_COMMAND_RESPONSE_EOS, | ||||
payload=overall, | ||||
) | ||||
Gregory Szorc
|
r37746 | |||
Gregory Szorc
|
r37744 | def createerrorframe(stream, requestid, msg, errtype): | ||
Gregory Szorc
|
r37073 | # TODO properly handle frame size limits. | ||
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE | ||||
Augie Fackler
|
r43346 | payload = b''.join( | ||
Augie Fackler
|
r46554 | cborutil.streamencode( | ||
{ | ||||
b'type': errtype, | ||||
b'message': [{b'msg': msg}], | ||||
} | ||||
) | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37073 | |||
Augie Fackler
|
r43346 | yield stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_ERROR_RESPONSE, | ||||
flags=0, | ||||
payload=payload, | ||||
) | ||||
Gregory Szorc
|
r37073 | |||
Augie Fackler
|
r43346 | |||
def createtextoutputframe( | ||||
stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE | ||||
): | ||||
Gregory Szorc
|
r37078 | """Create a text output frame to render text to people. | ||
``atoms`` is a 3-tuple of (formatting string, args, labels). | ||||
The formatting string contains ``%s`` tokens to be replaced by the | ||||
corresponding indexed entry in ``args``. ``labels`` is an iterable of | ||||
formatters to be applied at rendering time. In terms of the ``ui`` | ||||
class, each atom corresponds to a ``ui.write()``. | ||||
""" | ||||
Gregory Szorc
|
r37335 | atomdicts = [] | ||
Gregory Szorc
|
r37078 | |||
Raphaël Gomès
|
r52596 | for formatting, args, labels in atoms: | ||
Gregory Szorc
|
r37078 | # TODO look for localstr, other types here? | ||
if not isinstance(formatting, bytes): | ||||
Augie Fackler
|
r43347 | raise ValueError(b'must use bytes formatting strings') | ||
Gregory Szorc
|
r37078 | for arg in args: | ||
if not isinstance(arg, bytes): | ||||
Augie Fackler
|
r43347 | raise ValueError(b'must use bytes for arguments') | ||
Gregory Szorc
|
r37078 | for label in labels: | ||
if not isinstance(label, bytes): | ||||
Augie Fackler
|
r43347 | raise ValueError(b'must use bytes for labels') | ||
Gregory Szorc
|
r37078 | |||
Gregory Szorc
|
r37335 | # Formatting string must be ASCII. | ||
Augie Fackler
|
r43906 | formatting = formatting.decode('ascii', 'replace').encode('ascii') | ||
Gregory Szorc
|
r37078 | |||
# Arguments must be UTF-8. | ||||
Augie Fackler
|
r43906 | args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args] | ||
Gregory Szorc
|
r37078 | |||
# Labels must be ASCII. | ||||
Augie Fackler
|
r43906 | labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels] | ||
Gregory Szorc
|
r37078 | |||
Gregory Szorc
|
r37335 | atom = {b'msg': formatting} | ||
if args: | ||||
atom[b'args'] = args | ||||
if labels: | ||||
atom[b'labels'] = labels | ||||
Gregory Szorc
|
r37078 | |||
Gregory Szorc
|
r37335 | atomdicts.append(atom) | ||
Gregory Szorc
|
r37078 | |||
Gregory Szorc
|
r39477 | payload = b''.join(cborutil.streamencode(atomdicts)) | ||
Gregory Szorc
|
r37078 | |||
Gregory Szorc
|
r37335 | if len(payload) > maxframesize: | ||
Augie Fackler
|
r43347 | raise ValueError(b'cannot encode data in a single frame') | ||
Gregory Szorc
|
r37078 | |||
Augie Fackler
|
r43346 | yield stream.makeframe( | ||
requestid=requestid, | ||||
typeid=FRAME_TYPE_TEXT_OUTPUT, | ||||
flags=0, | ||||
payload=payload, | ||||
) | ||||
Gregory Szorc
|
r37303 | |||
Gregory Szorc
|
r49801 | class bufferingcommandresponseemitter: | ||
Gregory Szorc
|
r39596 | """Helper object to emit command response frames intelligently. | ||
Raw command response data is likely emitted in chunks much smaller | ||||
than what can fit in a single frame. This class exists to buffer | ||||
chunks until enough data is available to fit in a single frame. | ||||
TODO we'll need something like this when compression is supported. | ||||
So it might make sense to implement this functionality at the stream | ||||
level. | ||||
""" | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r39596 | def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): | ||
self._stream = stream | ||||
self._requestid = requestid | ||||
self._maxsize = maxframesize | ||||
self._chunks = [] | ||||
self._chunkssize = 0 | ||||
def send(self, data): | ||||
"""Send new data for emission. | ||||
Is a generator of new frames that were derived from the new input. | ||||
If the special input ``None`` is received, flushes all buffered | ||||
data to frames. | ||||
""" | ||||
if data is None: | ||||
for frame in self._flush(): | ||||
yield frame | ||||
return | ||||
Gregory Szorc
|
r40173 | data = self._stream.encode(data) | ||
Gregory Szorc
|
r39596 | # There is a ton of potential to do more complicated things here. | ||
# Our immediate goal is to coalesce small chunks into big frames, | ||||
# not achieve the fewest number of frames possible. So we go with | ||||
# a simple implementation: | ||||
# | ||||
# * If a chunk is too large for a frame, we flush and emit frames | ||||
# for the new chunk. | ||||
# * If a chunk can be buffered without total buffered size limits | ||||
# being exceeded, we do that. | ||||
# * If a chunk causes us to go over our buffering limit, we flush | ||||
# and then buffer the new chunk. | ||||
Gregory Szorc
|
r40171 | if not data: | ||
return | ||||
Gregory Szorc
|
r39596 | if len(data) > self._maxsize: | ||
for frame in self._flush(): | ||||
yield frame | ||||
# Now emit frames for the big chunk. | ||||
offset = 0 | ||||
while True: | ||||
Augie Fackler
|
r43346 | chunk = data[offset : offset + self._maxsize] | ||
Gregory Szorc
|
r39596 | offset += len(chunk) | ||
yield self._stream.makeframe( | ||||
self._requestid, | ||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | ||||
flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | ||||
Gregory Szorc
|
r40173 | payload=chunk, | ||
Augie Fackler
|
r43346 | encoded=True, | ||
) | ||||
Gregory Szorc
|
r39596 | |||
if offset == len(data): | ||||
return | ||||
# If we don't have enough to constitute a full frame, buffer and | ||||
# return. | ||||
if len(data) + self._chunkssize < self._maxsize: | ||||
self._chunks.append(data) | ||||
self._chunkssize += len(data) | ||||
return | ||||
# Else flush what we have and buffer the new chunk. We could do | ||||
# something more intelligent here, like break the chunk. Let's | ||||
# keep things simple for now. | ||||
for frame in self._flush(): | ||||
yield frame | ||||
self._chunks.append(data) | ||||
self._chunkssize = len(data) | ||||
def _flush(self): | ||||
payload = b''.join(self._chunks) | ||||
assert len(payload) <= self._maxsize | ||||
self._chunks[:] = [] | ||||
self._chunkssize = 0 | ||||
Gregory Szorc
|
r40171 | if not payload: | ||
return | ||||
Gregory Szorc
|
r39596 | yield self._stream.makeframe( | ||
self._requestid, | ||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | ||||
flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | ||||
Gregory Szorc
|
r40173 | payload=payload, | ||
Augie Fackler
|
r43346 | encoded=True, | ||
) | ||||
Gregory Szorc
|
r39596 | |||
Gregory Szorc
|
r40167 | # TODO consider defining encoders/decoders using the util.compressionengine | ||
# mechanism. | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class identityencoder: | ||
Gregory Szorc
|
r40167 | """Encoder for the "identity" stream encoding profile.""" | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | def __init__(self, ui): | ||
pass | ||||
def encode(self, data): | ||||
return data | ||||
def flush(self): | ||||
return b'' | ||||
def finish(self): | ||||
return b'' | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class identitydecoder: | ||
Gregory Szorc
|
r40167 | """Decoder for the "identity" stream encoding profile.""" | ||
def __init__(self, ui, extraobjs): | ||||
if extraobjs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'identity decoder received unexpected additional values') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40167 | |||
def decode(self, data): | ||||
return data | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class zlibencoder: | ||
Gregory Szorc
|
r40167 | def __init__(self, ui): | ||
import zlib | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | self._zlib = zlib | ||
self._compressor = zlib.compressobj() | ||||
def encode(self, data): | ||||
return self._compressor.compress(data) | ||||
def flush(self): | ||||
# Z_SYNC_FLUSH doesn't reset compression context, which is | ||||
# what we want. | ||||
return self._compressor.flush(self._zlib.Z_SYNC_FLUSH) | ||||
def finish(self): | ||||
res = self._compressor.flush(self._zlib.Z_FINISH) | ||||
self._compressor = None | ||||
return res | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class zlibdecoder: | ||
Gregory Szorc
|
r40167 | def __init__(self, ui, extraobjs): | ||
import zlib | ||||
if extraobjs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'zlib decoder received unexpected additional values') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40167 | |||
self._decompressor = zlib.decompressobj() | ||||
def decode(self, data): | ||||
return self._decompressor.decompress(data) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class zstdbaseencoder: | ||
Gregory Szorc
|
r40167 | def __init__(self, level): | ||
from . import zstd | ||||
self._zstd = zstd | ||||
cctx = zstd.ZstdCompressor(level=level) | ||||
self._compressor = cctx.compressobj() | ||||
def encode(self, data): | ||||
return self._compressor.compress(data) | ||||
def flush(self): | ||||
# COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the | ||||
# compressor and allows a decompressor to access all encoded data | ||||
# up to this point. | ||||
return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK) | ||||
def finish(self): | ||||
res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH) | ||||
self._compressor = None | ||||
return res | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | class zstd8mbencoder(zstdbaseencoder): | ||
def __init__(self, ui): | ||||
super(zstd8mbencoder, self).__init__(3) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class zstdbasedecoder: | ||
Gregory Szorc
|
r40167 | def __init__(self, maxwindowsize): | ||
from . import zstd | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize) | ||
self._decompressor = dctx.decompressobj() | ||||
def decode(self, data): | ||||
return self._decompressor.decompress(data) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | class zstd8mbdecoder(zstdbasedecoder): | ||
def __init__(self, ui, extraobjs): | ||||
if extraobjs: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'zstd8mb decoder received unexpected additional values') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40167 | |||
super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | # We lazily populate this to avoid excessive module imports when importing | ||
# this module. | ||||
STREAM_ENCODERS = {} | ||||
STREAM_ENCODERS_ORDER = [] | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | def populatestreamencoders(): | ||
if STREAM_ENCODERS: | ||||
return | ||||
try: | ||||
from . import zstd | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | zstd.__version__ | ||
except ImportError: | ||||
zstd = None | ||||
# zstandard is fastest and is preferred. | ||||
if zstd: | ||||
STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder) | ||||
STREAM_ENCODERS_ORDER.append(b'zstd-8mb') | ||||
STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder) | ||||
STREAM_ENCODERS_ORDER.append(b'zlib') | ||||
STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder) | ||||
STREAM_ENCODERS_ORDER.append(b'identity') | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class stream: | ||
Gregory Szorc
|
r37303 | """Represents a logical unidirectional series of frames.""" | ||
Gregory Szorc
|
r37304 | def __init__(self, streamid, active=False): | ||
self.streamid = streamid | ||||
Gregory Szorc
|
r37673 | self._active = active | ||
Gregory Szorc
|
r37304 | |||
Gregory Szorc
|
r37303 | def makeframe(self, requestid, typeid, flags, payload): | ||
"""Create a frame to be sent out over this stream. | ||||
Only returns the frame instance. Does not actually send it. | ||||
""" | ||||
Gregory Szorc
|
r37304 | streamflags = 0 | ||
if not self._active: | ||||
streamflags |= STREAM_FLAG_BEGIN_STREAM | ||||
self._active = True | ||||
Augie Fackler
|
r43346 | return makeframe( | ||
requestid, self.streamid, streamflags, typeid, flags, payload | ||||
) | ||||
Gregory Szorc
|
r37304 | |||
Gregory Szorc
|
r40166 | class inputstream(stream): | ||
"""Represents a stream used for receiving data.""" | ||||
Gregory Szorc
|
r40167 | def __init__(self, streamid, active=False): | ||
super(inputstream, self).__init__(streamid, active=active) | ||||
self._decoder = None | ||||
def setdecoder(self, ui, name, extraobjs): | ||||
Gregory Szorc
|
r40164 | """Set the decoder for this stream. | ||
Receives the stream profile name and any additional CBOR objects | ||||
decoded from the stream encoding settings frame payloads. | ||||
""" | ||||
Gregory Szorc
|
r40167 | if name not in STREAM_ENCODERS: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'unknown stream decoder: %s') % name) | ||
Gregory Szorc
|
r40167 | |||
self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs) | ||||
def decode(self, data): | ||||
# Default is identity decoder. We don't bother instantiating one | ||||
# because it is trivial. | ||||
if not self._decoder: | ||||
return data | ||||
return self._decoder.decode(data) | ||||
def flush(self): | ||||
if not self._decoder: | ||||
return b'' | ||||
return self._decoder.flush() | ||||
Gregory Szorc
|
r40164 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40166 | class outputstream(stream): | ||
"""Represents a stream used for sending data.""" | ||||
Gregory Szorc
|
r40167 | def __init__(self, streamid, active=False): | ||
super(outputstream, self).__init__(streamid, active=active) | ||||
Gregory Szorc
|
r40173 | self.streamsettingssent = False | ||
Gregory Szorc
|
r40167 | self._encoder = None | ||
Gregory Szorc
|
r40173 | self._encodername = None | ||
Gregory Szorc
|
r40167 | |||
def setencoder(self, ui, name): | ||||
"""Set the encoder for this stream. | ||||
Receives the stream profile name. | ||||
""" | ||||
if name not in STREAM_ENCODERS: | ||||
Augie Fackler
|
r43347 | raise error.Abort(_(b'unknown stream encoder: %s') % name) | ||
Gregory Szorc
|
r40167 | |||
self._encoder = STREAM_ENCODERS[name][0](ui) | ||||
Gregory Szorc
|
r40173 | self._encodername = name | ||
Gregory Szorc
|
r40167 | |||
def encode(self, data): | ||||
if not self._encoder: | ||||
return data | ||||
return self._encoder.encode(data) | ||||
def flush(self): | ||||
if not self._encoder: | ||||
return b'' | ||||
return self._encoder.flush() | ||||
def finish(self): | ||||
if not self._encoder: | ||||
return b'' | ||||
self._encoder.finish() | ||||
Augie Fackler
|
r43346 | def makeframe(self, requestid, typeid, flags, payload, encoded=False): | ||
Gregory Szorc
|
r40173 | """Create a frame to be sent out over this stream. | ||
Only returns the frame instance. Does not actually send it. | ||||
""" | ||||
streamflags = 0 | ||||
if not self._active: | ||||
streamflags |= STREAM_FLAG_BEGIN_STREAM | ||||
self._active = True | ||||
if encoded: | ||||
if not self.streamsettingssent: | ||||
raise error.ProgrammingError( | ||||
b'attempting to send encoded frame without sending stream ' | ||||
Augie Fackler
|
r43346 | b'settings' | ||
) | ||||
Gregory Szorc
|
r40173 | |||
streamflags |= STREAM_FLAG_ENCODING_APPLIED | ||||
Augie Fackler
|
r43346 | if ( | ||
typeid == FRAME_TYPE_STREAM_SETTINGS | ||||
and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS | ||||
): | ||||
Gregory Szorc
|
r40173 | self.streamsettingssent = True | ||
Augie Fackler
|
r43346 | return makeframe( | ||
requestid, self.streamid, streamflags, typeid, flags, payload | ||||
) | ||||
Gregory Szorc
|
r40173 | |||
def makestreamsettingsframe(self, requestid): | ||||
"""Create a stream settings frame for this stream. | ||||
Returns frame data or None if no stream settings frame is needed or has | ||||
already been sent. | ||||
""" | ||||
if not self._encoder or self.streamsettingssent: | ||||
return None | ||||
payload = b''.join(cborutil.streamencode(self._encodername)) | ||||
Augie Fackler
|
r43346 | return self.makeframe( | ||
requestid, | ||||
FRAME_TYPE_STREAM_SETTINGS, | ||||
FLAG_STREAM_ENCODING_SETTINGS_EOS, | ||||
payload, | ||||
) | ||||
Gregory Szorc
|
r40173 | |||
Gregory Szorc
|
r37304 | def ensureserverstream(stream): | ||
if stream.streamid % 2: | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Augie Fackler
|
r43347 | b'server should only write to even ' | ||
b'numbered streams; %d is not even' % stream.streamid | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37078 | |||
Gregory Szorc
|
r40162 | DEFAULT_PROTOCOL_SETTINGS = { | ||
Augie Fackler
|
r43347 | b'contentencodings': [b'identity'], | ||
Gregory Szorc
|
r40162 | } | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class serverreactor: | ||
Gregory Szorc
|
r37070 | """Holds state of a server handling frame-based protocol requests. | ||
This class is the "brain" of the unified frame-based protocol server | ||||
component. While the protocol is stateless from the perspective of | ||||
requests/commands, something needs to track which frames have been | ||||
received, what frames to expect, etc. This class is that thing. | ||||
Instances are modeled as a state machine of sorts. Instances are also | ||||
reactionary to external events. The point of this class is to encapsulate | ||||
the state of the connection and the exchange of frames, not to perform | ||||
work. Instead, callers tell this class when something occurs, like a | ||||
frame arriving. If that activity is worthy of a follow-up action (say | ||||
*run a command*), the return value of that handler will say so. | ||||
I/O and CPU intensive operations are purposefully delegated outside of | ||||
this class. | ||||
Consumers are expected to tell instances when events occur. They do so by | ||||
calling the various ``on*`` methods. These methods return a 2-tuple | ||||
describing any follow-up action(s) to take. The first element is the | ||||
name of an action to perform. The second is a data structure (usually | ||||
a dict) specific to that action that contains more information. e.g. | ||||
if the server wants to send frames back to the client, the data structure | ||||
will contain a reference to those frames. | ||||
Valid actions that consumers can be instructed to take are: | ||||
Gregory Szorc
|
r37073 | sendframes | ||
Indicates that frames should be sent to the client. The ``framegen`` | ||||
key contains a generator of frames that should be sent. The server | ||||
assumes that all frames are sent to the client. | ||||
Gregory Szorc
|
r37070 | error | ||
Indicates that an error occurred. Consumer should probably abort. | ||||
runcommand | ||||
Indicates that the consumer should run a wire protocol command. Details | ||||
of the command to run are given in the data structure. | ||||
wantframe | ||||
Indicates that nothing of interest happened and the server is waiting on | ||||
more frames from the client before anything interesting can be done. | ||||
Gregory Szorc
|
r37074 | |||
noop | ||||
Indicates no additional action is required. | ||||
Gregory Szorc
|
r37076 | |||
Known Issues | ||||
------------ | ||||
There are no limits to the number of partially received commands or their | ||||
size. A malicious client could stream command request data and exhaust the | ||||
server's memory. | ||||
Partially received commands are not acted upon when end of input is | ||||
reached. Should the server error if it receives a partial request? | ||||
Should the client send a message to abort a partially transmitted request | ||||
to facilitate graceful shutdown? | ||||
Active requests that haven't been responded to aren't tracked. This means | ||||
that if we receive a command and instruct its dispatch, another command | ||||
with its request ID can come in over the wire and there will be a race | ||||
between who responds to what. | ||||
Gregory Szorc
|
r37070 | """ | ||
Gregory Szorc
|
r40165 | def __init__(self, ui, deferoutput=False): | ||
Gregory Szorc
|
r37074 | """Construct a new server reactor. | ||
``deferoutput`` can be used to indicate that no output frames should be | ||||
instructed to be sent until input has been exhausted. In this mode, | ||||
events that would normally generate output frames (such as a command | ||||
response being ready) will instead defer instructing the consumer to | ||||
send those frames. This is useful for half-duplex transports where the | ||||
sender cannot receive until all data has been transmitted. | ||||
""" | ||||
Gregory Szorc
|
r40165 | self._ui = ui | ||
Gregory Szorc
|
r37074 | self._deferoutput = deferoutput | ||
Augie Fackler
|
r43347 | self._state = b'initial' | ||
Gregory Szorc
|
r37305 | self._nextoutgoingstreamid = 2 | ||
Gregory Szorc
|
r37074 | self._bufferedframegens = [] | ||
Gregory Szorc
|
r37304 | # stream id -> stream instance for all active streams from the client. | ||
self._incomingstreams = {} | ||||
Gregory Szorc
|
r37305 | self._outgoingstreams = {} | ||
Gregory Szorc
|
r37076 | # request id -> dict of commands that are actively being received. | ||
self._receivingcommands = {} | ||||
Gregory Szorc
|
r37081 | # Request IDs that have been received and are actively being processed. | ||
# Once all output for a request has been sent, it is removed from this | ||||
# set. | ||||
self._activecommands = set() | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r40162 | self._protocolsettingsdecoder = None | ||
# Sender protocol settings are optional. Set implied default values. | ||||
self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) | ||||
Gregory Szorc
|
r40167 | populatestreamencoders() | ||
Gregory Szorc
|
r37079 | def onframerecv(self, frame): | ||
Gregory Szorc
|
r37070 | """Process a frame that has been received off the wire. | ||
Returns a dict with an ``action`` key that details what action, | ||||
if any, the consumer should take next. | ||||
""" | ||||
Gregory Szorc
|
r37304 | if not frame.streamid % 2: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37304 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'received frame with even numbered stream ID: %d') | ||
Augie Fackler
|
r43346 | % frame.streamid | ||
) | ||||
Gregory Szorc
|
r37304 | |||
if frame.streamid not in self._incomingstreams: | ||||
if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37304 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'received frame on unknown inactive stream without ' | ||
b'beginning of stream flag set' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r37304 | |||
Gregory Szorc
|
r40166 | self._incomingstreams[frame.streamid] = inputstream(frame.streamid) | ||
Gregory Szorc
|
r37304 | |||
if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: | ||||
# TODO handle decoding frames | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Martin von Zweigbergk
|
r43387 | b'support for decoding stream payloads not yet implemented' | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37304 | |||
if frame.streamflags & STREAM_FLAG_END_STREAM: | ||||
del self._incomingstreams[frame.streamid] | ||||
Gregory Szorc
|
r37070 | handlers = { | ||
Augie Fackler
|
r43347 | b'initial': self._onframeinitial, | ||
b'protocol-settings-receiving': self._onframeprotocolsettings, | ||||
b'idle': self._onframeidle, | ||||
b'command-receiving': self._onframecommandreceiving, | ||||
b'errored': self._onframeerrored, | ||||
Gregory Szorc
|
r37070 | } | ||
meth = handlers.get(self._state) | ||||
if not meth: | ||||
Augie Fackler
|
r43347 | raise error.ProgrammingError(b'unhandled state: %s' % self._state) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | return meth(frame) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r39595 | def oncommandresponsereadyobjects(self, stream, requestid, objs): | ||
"""Signal that objects are ready to be sent to the client. | ||||
``objs`` is an iterable of objects (typically a generator) that will | ||||
be encoded via CBOR and added to frames, which will be sent to the | ||||
client. | ||||
""" | ||||
Gregory Szorc
|
r37746 | ensureserverstream(stream) | ||
Gregory Szorc
|
r40170 | # A more robust solution would be to check for objs.{next,__next__}. | ||
if isinstance(objs, list): | ||||
objs = iter(objs) | ||||
Gregory Szorc
|
r39595 | # We need to take care over exception handling. Uncaught exceptions | ||
# when generating frames could lead to premature end of the frame | ||||
# stream and the possibility of the server or client process getting | ||||
# in a bad state. | ||||
# | ||||
# Keep in mind that if ``objs`` is a generator, advancing it could | ||||
# raise exceptions that originated in e.g. wire protocol command | ||||
# functions. That is why we differentiate between exceptions raised | ||||
# when iterating versus other exceptions that occur. | ||||
# | ||||
# In all cases, when the function finishes, the request is fully | ||||
# handled and no new frames for it should be seen. | ||||
Gregory Szorc
|
r37746 | def sendframes(): | ||
Gregory Szorc
|
r39595 | emitted = False | ||
Gregory Szorc
|
r40061 | alternatelocationsent = False | ||
Gregory Szorc
|
r39596 | emitter = bufferingcommandresponseemitter(stream, requestid) | ||
Gregory Szorc
|
r39595 | while True: | ||
try: | ||||
o = next(objs) | ||||
except StopIteration: | ||||
Gregory Szorc
|
r39596 | for frame in emitter.send(None): | ||
yield frame | ||||
Gregory Szorc
|
r39595 | if emitted: | ||
Gregory Szorc
|
r40173 | for frame in createcommandresponseeosframes( | ||
Augie Fackler
|
r43346 | stream, requestid | ||
): | ||||
Gregory Szorc
|
r40173 | yield frame | ||
Gregory Szorc
|
r39595 | break | ||
except error.WireprotoCommandError as e: | ||||
for frame in createcommanderrorresponse( | ||||
Augie Fackler
|
r43346 | stream, requestid, e.message, e.messageargs | ||
): | ||||
Gregory Szorc
|
r39595 | yield frame | ||
break | ||||
except Exception as e: | ||||
Gregory Szorc
|
r39870 | for frame in createerrorframe( | ||
Augie Fackler
|
r43346 | stream, | ||
requestid, | ||||
Augie Fackler
|
r43347 | b'%s' % stringutil.forcebytestr(e), | ||
errtype=b'server', | ||||
Augie Fackler
|
r43346 | ): | ||
Gregory Szorc
|
r39595 | yield frame | ||
break | ||||
try: | ||||
Gregory Szorc
|
r40061 | # Alternate location responses can only be the first and | ||
# only object in the output stream. | ||||
if isinstance(o, wireprototypes.alternatelocationresponse): | ||||
if emitted: | ||||
raise error.ProgrammingError( | ||||
Augie Fackler
|
r43347 | b'alternatelocationresponse seen after initial ' | ||
b'output object' | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40061 | |||
Gregory Szorc
|
r40173 | frame = stream.makestreamsettingsframe(requestid) | ||
if frame: | ||||
yield frame | ||||
Gregory Szorc
|
r40061 | yield createalternatelocationresponseframe( | ||
Augie Fackler
|
r43346 | stream, requestid, o | ||
) | ||||
Gregory Szorc
|
r40061 | |||
alternatelocationsent = True | ||||
emitted = True | ||||
continue | ||||
if alternatelocationsent: | ||||
raise error.ProgrammingError( | ||||
Augie Fackler
|
r43347 | b'object follows alternatelocationresponse' | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40061 | |||
Gregory Szorc
|
r39595 | if not emitted: | ||
Gregory Szorc
|
r40173 | # Frame is optional. | ||
frame = stream.makestreamsettingsframe(requestid) | ||||
if frame: | ||||
yield frame | ||||
# May be None if empty frame (due to encoding). | ||||
frame = createcommandresponseokframe(stream, requestid) | ||||
if frame: | ||||
yield frame | ||||
Gregory Szorc
|
r39595 | emitted = True | ||
Gregory Szorc
|
r40056 | # Objects emitted by command functions can be serializable | ||
# data structures or special types. | ||||
# TODO consider extracting the content normalization to a | ||||
# standalone function, as it may be useful for e.g. cachers. | ||||
# A pre-encoded object is sent directly to the emitter. | ||||
if isinstance(o, wireprototypes.encodedresponse): | ||||
for frame in emitter.send(o.data): | ||||
Gregory Szorc
|
r39596 | yield frame | ||
Gregory Szorc
|
r40364 | elif isinstance( | ||
Augie Fackler
|
r43346 | o, wireprototypes.indefinitebytestringresponse | ||
): | ||||
Gregory Szorc
|
r40364 | for chunk in cborutil.streamencodebytestringfromiter( | ||
Augie Fackler
|
r43346 | o.chunks | ||
): | ||||
Gregory Szorc
|
r40364 | for frame in emitter.send(chunk): | ||
yield frame | ||||
Gregory Szorc
|
r40056 | # A regular object is CBOR encoded. | ||
else: | ||||
for chunk in cborutil.streamencode(o): | ||||
for frame in emitter.send(chunk): | ||||
yield frame | ||||
Gregory Szorc
|
r39595 | except Exception as e: | ||
Augie Fackler
|
r43346 | for frame in createerrorframe( | ||
Augie Fackler
|
r43347 | stream, requestid, b'%s' % e, errtype=b'server' | ||
Augie Fackler
|
r43346 | ): | ||
Gregory Szorc
|
r39595 | yield frame | ||
break | ||||
Gregory Szorc
|
r37746 | |||
self._activecommands.remove(requestid) | ||||
return self._handlesendframes(sendframes()) | ||||
Gregory Szorc
|
r37074 | def oninputeof(self): | ||
"""Signals that end of input has been received. | ||||
No more frames will be received. All pending activity should be | ||||
completed. | ||||
""" | ||||
Gregory Szorc
|
r37076 | # TODO should we do anything about in-flight commands? | ||
Gregory Szorc
|
r37074 | if not self._deferoutput or not self._bufferedframegens: | ||
Augie Fackler
|
r43347 | return b'noop', {} | ||
Gregory Szorc
|
r37074 | |||
# If we buffered all our responses, emit those. | ||||
def makegen(): | ||||
for gen in self._bufferedframegens: | ||||
for frame in gen: | ||||
yield frame | ||||
Augie Fackler
|
r46554 | return b'sendframes', { | ||
b'framegen': makegen(), | ||||
} | ||||
Gregory Szorc
|
r37073 | |||
Gregory Szorc
|
r37746 | def _handlesendframes(self, framegen): | ||
if self._deferoutput: | ||||
self._bufferedframegens.append(framegen) | ||||
Augie Fackler
|
r43347 | return b'noop', {} | ||
Gregory Szorc
|
r37746 | else: | ||
Augie Fackler
|
r46554 | return b'sendframes', { | ||
b'framegen': framegen, | ||||
} | ||||
Gregory Szorc
|
r37746 | |||
Gregory Szorc
|
r37744 | def onservererror(self, stream, requestid, msg): | ||
Gregory Szorc
|
r37304 | ensureserverstream(stream) | ||
Gregory Szorc
|
r37746 | def sendframes(): | ||
Augie Fackler
|
r43346 | for frame in createerrorframe( | ||
Augie Fackler
|
r43347 | stream, requestid, msg, errtype=b'server' | ||
Augie Fackler
|
r43346 | ): | ||
Gregory Szorc
|
r37746 | yield frame | ||
self._activecommands.remove(requestid) | ||||
return self._handlesendframes(sendframes()) | ||||
def oncommanderror(self, stream, requestid, message, args=None): | ||||
"""Called when a command encountered an error before sending output.""" | ||||
ensureserverstream(stream) | ||||
def sendframes(): | ||||
Augie Fackler
|
r43346 | for frame in createcommanderrorresponse( | ||
stream, requestid, message, args | ||||
): | ||||
Gregory Szorc
|
r37746 | yield frame | ||
self._activecommands.remove(requestid) | ||||
return self._handlesendframes(sendframes()) | ||||
Gregory Szorc
|
r37073 | |||
Gregory Szorc
|
r37305 | def makeoutputstream(self): | ||
Gregory Szorc
|
r40173 | """Create a stream to be used for sending data to the client. | ||
If this is called before protocol settings frames are received, we | ||||
don't know what stream encodings are supported by the client and | ||||
we will default to identity. | ||||
""" | ||||
Gregory Szorc
|
r37305 | streamid = self._nextoutgoingstreamid | ||
self._nextoutgoingstreamid += 2 | ||||
Gregory Szorc
|
r40166 | s = outputstream(streamid) | ||
Gregory Szorc
|
r37305 | self._outgoingstreams[streamid] = s | ||
Gregory Szorc
|
r40173 | # Always use the *server's* preferred encoder over the client's, | ||
# as servers have more to lose from sub-optimal encoders being used. | ||||
for name in STREAM_ENCODERS_ORDER: | ||||
Augie Fackler
|
r43347 | if name in self._sendersettings[b'contentencodings']: | ||
Gregory Szorc
|
r40173 | s.setencoder(self._ui, name) | ||
break | ||||
Gregory Szorc
|
r37305 | return s | ||
Gregory Szorc
|
r37070 | def _makeerrorresult(self, msg): | ||
Augie Fackler
|
r46554 | return b'error', { | ||
b'message': msg, | ||||
} | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37076 | def _makeruncommandresult(self, requestid): | ||
entry = self._receivingcommands[requestid] | ||||
Gregory Szorc
|
r37308 | |||
Augie Fackler
|
r43347 | if not entry[b'requestdone']: | ||
self._state = b'errored' | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Martin von Zweigbergk
|
r43387 | b'should not be called without requestdone set' | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37308 | |||
Gregory Szorc
|
r37076 | del self._receivingcommands[requestid] | ||
if self._receivingcommands: | ||||
Augie Fackler
|
r43347 | self._state = b'command-receiving' | ||
Gregory Szorc
|
r37076 | else: | ||
Augie Fackler
|
r43347 | self._state = b'idle' | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37308 | # Decode the payloads as CBOR. | ||
Augie Fackler
|
r43347 | entry[b'payload'].seek(0) | ||
request = cborutil.decodeall(entry[b'payload'].getvalue())[0] | ||||
Gregory Szorc
|
r37308 | |||
if b'name' not in request: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37308 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'command request missing "name" field') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37308 | |||
if b'args' not in request: | ||||
request[b'args'] = {} | ||||
Gregory Szorc
|
r37081 | assert requestid not in self._activecommands | ||
self._activecommands.add(requestid) | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'runcommand', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'requestid': requestid, | ||
b'command': request[b'name'], | ||||
b'args': request[b'args'], | ||||
b'redirect': request.get(b'redirect'), | ||||
b'data': entry[b'data'].getvalue() if entry[b'data'] else None, | ||||
Augie Fackler
|
r43346 | }, | ||
) | ||||
Gregory Szorc
|
r37070 | |||
def _makewantframeresult(self): | ||||
Augie Fackler
|
r46554 | return b'wantframe', { | ||
b'state': self._state, | ||||
} | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37308 | def _validatecommandrequestframe(self, frame): | ||
new = frame.flags & FLAG_COMMAND_REQUEST_NEW | ||||
continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION | ||||
if new and continuation: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37308 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'received command request frame with both new and ' | ||
b'continuation flags set' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r37308 | |||
if not new and not continuation: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37308 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'received command request frame with neither new nor ' | ||
b'continuation flags set' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r37308 | |||
Gregory Szorc
|
r40162 | def _onframeinitial(self, frame): | ||
# Called when we receive a frame when in the "initial" state. | ||||
if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: | ||||
Augie Fackler
|
r43347 | self._state = b'protocol-settings-receiving' | ||
Gregory Szorc
|
r40162 | self._protocolsettingsdecoder = cborutil.bufferingdecoder() | ||
return self._onframeprotocolsettings(frame) | ||||
elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST: | ||||
Augie Fackler
|
r43347 | self._state = b'idle' | ||
Gregory Szorc
|
r40162 | return self._onframeidle(frame) | ||
else: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r40162 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'expected sender protocol settings or command request ' | ||
b'frame; got %d' | ||||
Augie Fackler
|
r43346 | ) | ||
% frame.typeid | ||||
) | ||||
Gregory Szorc
|
r40162 | |||
def _onframeprotocolsettings(self, frame): | ||||
Augie Fackler
|
r43347 | assert self._state == b'protocol-settings-receiving' | ||
Gregory Szorc
|
r40162 | assert self._protocolsettingsdecoder is not None | ||
if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r40162 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'expected sender protocol settings frame; got %d') | ||
Augie Fackler
|
r43346 | % frame.typeid | ||
) | ||||
Gregory Szorc
|
r40162 | |||
more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION | ||||
eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS | ||||
if more and eos: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r40162 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'sender protocol settings frame cannot have both ' | ||
b'continuation and end of stream flags set' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40162 | |||
if not more and not eos: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r40162 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'sender protocol settings frame must have continuation or ' | ||
b'end of stream flag set' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40162 | |||
# TODO establish limits for maximum amount of data that can be | ||||
# buffered. | ||||
try: | ||||
self._protocolsettingsdecoder.decode(frame.payload) | ||||
except Exception as e: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r40162 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _( | ||
b'error decoding CBOR from sender protocol settings frame: %s' | ||||
) | ||||
Augie Fackler
|
r43346 | % stringutil.forcebytestr(e) | ||
) | ||||
Gregory Szorc
|
r40162 | |||
if more: | ||||
return self._makewantframeresult() | ||||
assert eos | ||||
decoded = self._protocolsettingsdecoder.getavailable() | ||||
self._protocolsettingsdecoder = None | ||||
if not decoded: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r40162 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'sender protocol settings frame did not contain CBOR data') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40162 | elif len(decoded) > 1: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r40162 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'sender protocol settings frame contained multiple CBOR ' | ||
b'values' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40162 | |||
d = decoded[0] | ||||
if b'contentencodings' in d: | ||||
Augie Fackler
|
r43347 | self._sendersettings[b'contentencodings'] = d[b'contentencodings'] | ||
Gregory Szorc
|
r40162 | |||
Augie Fackler
|
r43347 | self._state = b'idle' | ||
Gregory Szorc
|
r40162 | |||
return self._makewantframeresult() | ||||
Gregory Szorc
|
r37079 | def _onframeidle(self, frame): | ||
Gregory Szorc
|
r37070 | # The only frame type that should be received in this state is a | ||
# command request. | ||||
Gregory Szorc
|
r37308 | if frame.typeid != FRAME_TYPE_COMMAND_REQUEST: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37070 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'expected command request frame; got %d') % frame.typeid | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37308 | |||
res = self._validatecommandrequestframe(frame) | ||||
if res: | ||||
return res | ||||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | if frame.requestid in self._receivingcommands: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37076 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'request with ID %d already received') % frame.requestid | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37081 | if frame.requestid in self._activecommands: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37308 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'request with ID %d is already active') % frame.requestid | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37308 | |||
new = frame.flags & FLAG_COMMAND_REQUEST_NEW | ||||
moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES | ||||
expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA | ||||
Gregory Szorc
|
r37081 | |||
Gregory Szorc
|
r37308 | if not new: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37308 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'received command request frame without new flag set') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37308 | |||
payload = util.bytesio() | ||||
payload.write(frame.payload) | ||||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | self._receivingcommands[frame.requestid] = { | ||
Augie Fackler
|
r43347 | b'payload': payload, | ||
b'data': None, | ||||
b'requestdone': not moreframes, | ||||
b'expectingdata': bool(expectingdata), | ||||
Gregory Szorc
|
r37076 | } | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37308 | # This is the final frame for this request. Dispatch it. | ||
if not moreframes and not expectingdata: | ||||
Gregory Szorc
|
r37079 | return self._makeruncommandresult(frame.requestid) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37308 | assert moreframes or expectingdata | ||
Augie Fackler
|
r43347 | self._state = b'command-receiving' | ||
Gregory Szorc
|
r37308 | return self._makewantframeresult() | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | def _onframecommandreceiving(self, frame): | ||
Gregory Szorc
|
r37308 | if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: | ||
# Process new command requests as such. | ||||
if frame.flags & FLAG_COMMAND_REQUEST_NEW: | ||||
return self._onframeidle(frame) | ||||
res = self._validatecommandrequestframe(frame) | ||||
if res: | ||||
return res | ||||
Gregory Szorc
|
r37076 | |||
# All other frames should be related to a command that is currently | ||||
Gregory Szorc
|
r37081 | # receiving but is not active. | ||
if frame.requestid in self._activecommands: | ||||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37081 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'received frame for request that is still active: %d') | ||
Augie Fackler
|
r43346 | % frame.requestid | ||
) | ||||
Gregory Szorc
|
r37081 | |||
Gregory Szorc
|
r37079 | if frame.requestid not in self._receivingcommands: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Gregory Szorc
|
r37076 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'received frame for request that is not receiving: %d') | ||
Augie Fackler
|
r43346 | % frame.requestid | ||
) | ||||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | entry = self._receivingcommands[frame.requestid] | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37308 | if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: | ||
moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES | ||||
expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA) | ||||
Augie Fackler
|
r43347 | if entry[b'requestdone']: | ||
self._state = b'errored' | ||||
Gregory Szorc
|
r37308 | return self._makeerrorresult( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'received command request frame when request frames ' | ||
b'were supposedly done' | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r37308 | |||
Augie Fackler
|
r43347 | if expectingdata != entry[b'expectingdata']: | ||
self._state = b'errored' | ||||
Gregory Szorc
|
r37308 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'mismatch between expect data flag and previous frame') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37308 | |||
Augie Fackler
|
r43347 | entry[b'payload'].write(frame.payload) | ||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37308 | if not moreframes: | ||
Augie Fackler
|
r43347 | entry[b'requestdone'] = True | ||
Gregory Szorc
|
r37308 | |||
if not moreframes and not expectingdata: | ||||
return self._makeruncommandresult(frame.requestid) | ||||
return self._makewantframeresult() | ||||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | elif frame.typeid == FRAME_TYPE_COMMAND_DATA: | ||
Augie Fackler
|
r43347 | if not entry[b'expectingdata']: | ||
self._state = b'errored' | ||||
Augie Fackler
|
r43346 | return self._makeerrorresult( | ||
_( | ||||
Augie Fackler
|
r43347 | b'received command data frame for request that is not ' | ||
b'expecting data: %d' | ||||
Augie Fackler
|
r43346 | ) | ||
% frame.requestid | ||||
) | ||||
Gregory Szorc
|
r37076 | |||
Augie Fackler
|
r43347 | if entry[b'data'] is None: | ||
entry[b'data'] = util.bytesio() | ||||
Gregory Szorc
|
r37076 | |||
Gregory Szorc
|
r37079 | return self._handlecommanddataframe(frame, entry) | ||
Gregory Szorc
|
r37308 | else: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Augie Fackler
|
r43346 | return self._makeerrorresult( | ||
Augie Fackler
|
r43347 | _(b'received unexpected frame type: %d') % frame.typeid | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | def _handlecommanddataframe(self, frame, entry): | ||
assert frame.typeid == FRAME_TYPE_COMMAND_DATA | ||||
Gregory Szorc
|
r37070 | |||
# TODO support streaming data instead of buffering it. | ||||
Augie Fackler
|
r43347 | entry[b'data'].write(frame.payload) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: | ||
Gregory Szorc
|
r37070 | return self._makewantframeresult() | ||
Gregory Szorc
|
r37079 | elif frame.flags & FLAG_COMMAND_DATA_EOS: | ||
Augie Fackler
|
r43347 | entry[b'data'].seek(0) | ||
Gregory Szorc
|
r37079 | return self._makeruncommandresult(frame.requestid) | ||
Gregory Szorc
|
r37070 | else: | ||
Augie Fackler
|
r43347 | self._state = b'errored' | ||
Martin von Zweigbergk
|
r43387 | return self._makeerrorresult(_(b'command data frame without flags')) | ||
Gregory Szorc
|
r37070 | |||
Gregory Szorc
|
r37079 | def _onframeerrored(self, frame): | ||
Augie Fackler
|
r43347 | return self._makeerrorresult(_(b'server already errored')) | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class commandrequest: | ||
Gregory Szorc
|
r37561 | """Represents a request to run a command.""" | ||
Gregory Szorc
|
r40060 | def __init__(self, requestid, name, args, datafh=None, redirect=None): | ||
Gregory Szorc
|
r37561 | self.requestid = requestid | ||
self.name = name | ||||
self.args = args | ||||
self.datafh = datafh | ||||
Gregory Szorc
|
r40060 | self.redirect = redirect | ||
Augie Fackler
|
r43347 | self.state = b'pending' | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class clientreactor: | ||
Gregory Szorc
|
r37561 | """Holds state of a client issuing frame-based protocol requests. | ||
This is like ``serverreactor`` but for client-side state. | ||||
Each instance is bound to the lifetime of a connection. For persistent | ||||
connection transports using e.g. TCP sockets and speaking the raw | ||||
framing protocol, there will be a single instance for the lifetime of | ||||
the TCP socket. For transports where there are multiple discrete | ||||
interactions (say tunneled within in HTTP request), there will be a | ||||
separate instance for each distinct interaction. | ||||
Gregory Szorc
|
r40163 | |||
Consumers are expected to tell instances when events occur by calling | ||||
various methods. These methods return a 2-tuple describing any follow-up | ||||
action(s) to take. The first element is the name of an action to | ||||
perform. The second is a data structure (usually a dict) specific to | ||||
that action that contains more information. e.g. if the reactor wants | ||||
to send frames to the server, the data structure will contain a reference | ||||
to those frames. | ||||
Valid actions that consumers can be instructed to take are: | ||||
noop | ||||
Indicates no additional action is required. | ||||
sendframes | ||||
Indicates that frames should be sent to the server. The ``framegen`` | ||||
key contains a generator of frames that should be sent. The reactor | ||||
assumes that all frames in this generator are sent to the server. | ||||
error | ||||
Indicates that an error occurred. The ``message`` key contains an | ||||
error message describing the failure. | ||||
responsedata | ||||
Indicates a response to a previously-issued command was received. | ||||
The ``request`` key contains the ``commandrequest`` instance that | ||||
represents the request this data is for. | ||||
The ``data`` key contains the decoded data from the server. | ||||
``expectmore`` and ``eos`` evaluate to True when more response data | ||||
is expected to follow or we're at the end of the response stream, | ||||
respectively. | ||||
Gregory Szorc
|
r37561 | """ | ||
Augie Fackler
|
r43346 | |||
def __init__( | ||||
self, | ||||
ui, | ||||
hasmultiplesend=False, | ||||
buffersends=True, | ||||
clientcontentencoders=None, | ||||
): | ||||
Gregory Szorc
|
r37561 | """Create a new instance. | ||
``hasmultiplesend`` indicates whether multiple sends are supported | ||||
by the transport. When True, it is possible to send commands immediately | ||||
instead of buffering until the caller signals an intent to finish a | ||||
send operation. | ||||
``buffercommands`` indicates whether sends should be buffered until the | ||||
last request has been issued. | ||||
Gregory Szorc
|
r40168 | |||
``clientcontentencoders`` is an iterable of content encoders the client | ||||
will advertise to the server and that the server can use for encoding | ||||
data. If not defined, the client will not advertise content encoders | ||||
to the server. | ||||
Gregory Szorc
|
r37561 | """ | ||
Gregory Szorc
|
r40165 | self._ui = ui | ||
Gregory Szorc
|
r37561 | self._hasmultiplesend = hasmultiplesend | ||
self._buffersends = buffersends | ||||
Gregory Szorc
|
r40168 | self._clientcontentencoders = clientcontentencoders | ||
Gregory Szorc
|
r37561 | |||
self._canissuecommands = True | ||||
self._cansend = True | ||||
Gregory Szorc
|
r40168 | self._protocolsettingssent = False | ||
Gregory Szorc
|
r37561 | |||
self._nextrequestid = 1 | ||||
# We only support a single outgoing stream for now. | ||||
Gregory Szorc
|
r40166 | self._outgoingstream = outputstream(1) | ||
Gregory Szorc
|
r37561 | self._pendingrequests = collections.deque() | ||
self._activerequests = {} | ||||
Gregory Szorc
|
r37562 | self._incomingstreams = {} | ||
Gregory Szorc
|
r40164 | self._streamsettingsdecoders = {} | ||
Gregory Szorc
|
r37561 | |||
Gregory Szorc
|
r40167 | populatestreamencoders() | ||
Gregory Szorc
|
r40060 | def callcommand(self, name, args, datafh=None, redirect=None): | ||
Gregory Szorc
|
r37561 | """Request that a command be executed. | ||
Receives the command name, a dict of arguments to pass to the command, | ||||
and an optional file object containing the raw data for the command. | ||||
Returns a 3-tuple of (request, action, action data). | ||||
""" | ||||
if not self._canissuecommands: | ||||
Augie Fackler
|
r43347 | raise error.ProgrammingError(b'cannot issue new commands') | ||
Gregory Szorc
|
r37561 | |||
requestid = self._nextrequestid | ||||
self._nextrequestid += 2 | ||||
Augie Fackler
|
r43346 | request = commandrequest( | ||
requestid, name, args, datafh=datafh, redirect=redirect | ||||
) | ||||
Gregory Szorc
|
r37561 | |||
if self._buffersends: | ||||
self._pendingrequests.append(request) | ||||
Augie Fackler
|
r43347 | return request, b'noop', {} | ||
Gregory Szorc
|
r37561 | else: | ||
if not self._cansend: | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Martin von Zweigbergk
|
r43387 | b'sends cannot be performed on this instance' | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37561 | |||
if not self._hasmultiplesend: | ||||
self._cansend = False | ||||
self._canissuecommands = False | ||||
Augie Fackler
|
r43346 | return ( | ||
request, | ||||
Augie Fackler
|
r43347 | b'sendframes', | ||
Augie Fackler
|
r46554 | { | ||
b'framegen': self._makecommandframes(request), | ||||
}, | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37561 | |||
def flushcommands(self): | ||||
"""Request that all queued commands be sent. | ||||
If any commands are buffered, this will instruct the caller to send | ||||
them over the wire. If no commands are buffered it instructs the client | ||||
to no-op. | ||||
If instances aren't configured for multiple sends, no new command | ||||
requests are allowed after this is called. | ||||
""" | ||||
if not self._pendingrequests: | ||||
Augie Fackler
|
r43347 | return b'noop', {} | ||
Gregory Szorc
|
r37561 | |||
if not self._cansend: | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Martin von Zweigbergk
|
r43387 | b'sends cannot be performed on this instance' | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37561 | |||
# If the instance only allows sending once, mark that we have fired | ||||
# our one shot. | ||||
if not self._hasmultiplesend: | ||||
self._canissuecommands = False | ||||
self._cansend = False | ||||
def makeframes(): | ||||
while self._pendingrequests: | ||||
request = self._pendingrequests.popleft() | ||||
for frame in self._makecommandframes(request): | ||||
yield frame | ||||
Augie Fackler
|
r46554 | return b'sendframes', { | ||
b'framegen': makeframes(), | ||||
} | ||||
Gregory Szorc
|
r37561 | |||
def _makecommandframes(self, request): | ||||
"""Emit frames to issue a command request. | ||||
As a side-effect, update request accounting to reflect its changed | ||||
state. | ||||
""" | ||||
self._activerequests[request.requestid] = request | ||||
Augie Fackler
|
r43347 | request.state = b'sending' | ||
Gregory Szorc
|
r37561 | |||
Gregory Szorc
|
r40168 | if not self._protocolsettingssent and self._clientcontentencoders: | ||
self._protocolsettingssent = True | ||||
Augie Fackler
|
r43346 | payload = b''.join( | ||
cborutil.streamencode( | ||||
Augie Fackler
|
r46554 | { | ||
b'contentencodings': self._clientcontentencoders, | ||||
} | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40168 | |||
yield self._outgoingstream.makeframe( | ||||
requestid=request.requestid, | ||||
typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS, | ||||
flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS, | ||||
Augie Fackler
|
r43346 | payload=payload, | ||
) | ||||
Gregory Szorc
|
r40168 | |||
Augie Fackler
|
r43346 | res = createcommandframes( | ||
self._outgoingstream, | ||||
request.requestid, | ||||
request.name, | ||||
request.args, | ||||
datafh=request.datafh, | ||||
redirect=request.redirect, | ||||
) | ||||
Gregory Szorc
|
r37561 | |||
for frame in res: | ||||
yield frame | ||||
Augie Fackler
|
r43347 | request.state = b'sent' | ||
Gregory Szorc
|
r37562 | |||
def onframerecv(self, frame): | ||||
"""Process a frame that has been received off the wire. | ||||
Returns a 2-tuple of (action, meta) describing further action the | ||||
caller needs to take as a result of receiving this frame. | ||||
""" | ||||
if frame.streamid % 2: | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': ( | ||
_(b'received frame with odd numbered stream ID: %d') | ||||
Augie Fackler
|
r43346 | % frame.streamid | ||
), | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r37562 | |||
if frame.streamid not in self._incomingstreams: | ||||
if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': _( | ||
b'received frame on unknown stream ' | ||||
b'without beginning of stream flag set' | ||||
Augie Fackler
|
r43346 | ), | ||
}, | ||||
) | ||||
Gregory Szorc
|
r37562 | |||
Augie Fackler
|
r43346 | self._incomingstreams[frame.streamid] = inputstream(frame.streamid) | ||
Gregory Szorc
|
r37674 | |||
Gregory Szorc
|
r40167 | stream = self._incomingstreams[frame.streamid] | ||
# If the payload is encoded, ask the stream to decode it. We | ||||
# merely substitute the decoded result into the frame payload as | ||||
# if it had been transferred all along. | ||||
Gregory Szorc
|
r37562 | if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: | ||
Gregory Szorc
|
r40167 | frame.payload = stream.decode(frame.payload) | ||
Gregory Szorc
|
r37562 | |||
if frame.streamflags & STREAM_FLAG_END_STREAM: | ||||
del self._incomingstreams[frame.streamid] | ||||
Gregory Szorc
|
r40164 | if frame.typeid == FRAME_TYPE_STREAM_SETTINGS: | ||
return self._onstreamsettingsframe(frame) | ||||
Gregory Szorc
|
r37562 | if frame.requestid not in self._activerequests: | ||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': ( | ||
_(b'received frame for inactive request ID: %d') | ||||
Augie Fackler
|
r43346 | % frame.requestid | ||
), | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r37562 | |||
request = self._activerequests[frame.requestid] | ||||
Augie Fackler
|
r43347 | request.state = b'receiving' | ||
Gregory Szorc
|
r37562 | |||
handlers = { | ||||
Gregory Szorc
|
r37742 | FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe, | ||
Gregory Szorc
|
r37744 | FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe, | ||
Gregory Szorc
|
r37562 | } | ||
meth = handlers.get(frame.typeid) | ||||
if not meth: | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Augie Fackler
|
r43347 | b'unhandled frame type: %d' % frame.typeid | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37562 | |||
return meth(request, frame) | ||||
Gregory Szorc
|
r40164 | def _onstreamsettingsframe(self, frame): | ||
assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS | ||||
more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION | ||||
eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS | ||||
if more and eos: | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': ( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'stream encoding settings frame cannot have both ' | ||
b'continuation and end of stream flags set' | ||||
Augie Fackler
|
r43346 | ) | ||
), | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
if not more and not eos: | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': _( | ||
b'stream encoding settings frame must have ' | ||||
b'continuation or end of stream flag set' | ||||
Augie Fackler
|
r43346 | ), | ||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
if frame.streamid not in self._streamsettingsdecoders: | ||||
decoder = cborutil.bufferingdecoder() | ||||
self._streamsettingsdecoders[frame.streamid] = decoder | ||||
decoder = self._streamsettingsdecoders[frame.streamid] | ||||
try: | ||||
decoder.decode(frame.payload) | ||||
except Exception as e: | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': ( | ||
Augie Fackler
|
r43346 | _( | ||
Augie Fackler
|
r43347 | b'error decoding CBOR from stream encoding ' | ||
b'settings frame: %s' | ||||
Augie Fackler
|
r43346 | ) | ||
% stringutil.forcebytestr(e) | ||||
), | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
if more: | ||||
Augie Fackler
|
r43347 | return b'noop', {} | ||
Gregory Szorc
|
r40164 | |||
assert eos | ||||
decoded = decoder.getavailable() | ||||
del self._streamsettingsdecoders[frame.streamid] | ||||
if not decoded: | ||||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': _( | ||
b'stream encoding settings frame did not contain ' | ||||
b'CBOR data' | ||||
Augie Fackler
|
r43346 | ), | ||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
try: | ||||
Augie Fackler
|
r43346 | self._incomingstreams[frame.streamid].setdecoder( | ||
self._ui, decoded[0], decoded[1:] | ||||
) | ||||
Gregory Szorc
|
r40164 | except Exception as e: | ||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'message': ( | ||
_(b'error setting stream decoder: %s') | ||||
Augie Fackler
|
r43346 | % stringutil.forcebytestr(e) | ||
), | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
Augie Fackler
|
r43347 | return b'noop', {} | ||
Gregory Szorc
|
r40164 | |||
Gregory Szorc
|
r37742 | def _oncommandresponseframe(self, request, frame): | ||
if frame.flags & FLAG_COMMAND_RESPONSE_EOS: | ||||
Augie Fackler
|
r43347 | request.state = b'received' | ||
Gregory Szorc
|
r37562 | del self._activerequests[request.requestid] | ||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'responsedata', | ||
Augie Fackler
|
r43346 | { | ||
Augie Fackler
|
r43347 | b'request': request, | ||
b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION, | ||||
b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS, | ||||
b'data': frame.payload, | ||||
Augie Fackler
|
r43346 | }, | ||
) | ||||
Gregory Szorc
|
r37744 | |||
def _onerrorresponseframe(self, request, frame): | ||||
Augie Fackler
|
r43347 | request.state = b'errored' | ||
Gregory Szorc
|
r37744 | del self._activerequests[request.requestid] | ||
# The payload should be a CBOR map. | ||||
Gregory Szorc
|
r39477 | m = cborutil.decodeall(frame.payload)[0] | ||
Gregory Szorc
|
r37744 | |||
Augie Fackler
|
r43346 | return ( | ||
Augie Fackler
|
r43347 | b'error', | ||
{ | ||||
b'request': request, | ||||
b'type': m[b'type'], | ||||
b'message': m[b'message'], | ||||
}, | ||||
Augie Fackler
|
r43346 | ) | ||