##// END OF EJS Templates
win32mbcs: fix a `str` type conditional for py3...
win32mbcs: fix a `str` type conditional for py3 Differential Revision: https://phab.mercurial-scm.org/D7535

File last commit:

r43812:2fe6121c default
r44181:66210a20 stable
Show More
wireprotoframing.py
2070 lines | 64.7 KiB | text/x-python | PythonLexer
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 # wireprotoframing.py - unified framing protocol for wire protocol
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# This file contains functionality to support the unified frame-based wire
# protocol. For details about the protocol, see
# `hg help internals.wireprotocol`.
from __future__ import absolute_import
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 import collections
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 import struct
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 from .i18n import _
Gregory Szorc
py3: manually import getattr where it is needed...
r43359 from .pycompat import getattr
Augie Fackler
formatting: blacken the codebase...
r43346 from .thirdparty import attr
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 from . import (
Yuya Nishihara
py3: system-stringify repr(frame)...
r37492 encoding,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 error,
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061 pycompat,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 util,
Gregory Szorc
wireprotov2: define type to represent pre-encoded object...
r40056 wireprototypes,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 )
Yuya Nishihara
stringutil: bulk-replace call sites to point to new module...
r37102 from .utils import (
Gregory Szorc
wireprotoframing: use our CBOR module...
r39477 cborutil,
Yuya Nishihara
stringutil: bulk-replace call sites to point to new module...
r37102 stringutil,
)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 FRAME_HEADER_SIZE = 8
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 DEFAULT_MAX_FRAME_SIZE = 32768
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 STREAM_FLAG_BEGIN_STREAM = 0x01
STREAM_FLAG_END_STREAM = 0x02
STREAM_FLAG_ENCODING_APPLIED = 0x04
STREAM_FLAGS = {
b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
b'stream-end': STREAM_FLAG_END_STREAM,
b'encoded': STREAM_FLAG_ENCODING_APPLIED,
}
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 FRAME_TYPE_COMMAND_REQUEST = 0x01
Gregory Szorc
wireprotov2: change frame type value for command data...
r37741 FRAME_TYPE_COMMAND_DATA = 0x02
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 FRAME_TYPE_COMMAND_RESPONSE = 0x03
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 FRAME_TYPE_ERROR_RESPONSE = 0x05
Gregory Szorc
wireproto: define human output side channel frame...
r37078 FRAME_TYPE_TEXT_OUTPUT = 0x06
Gregory Szorc
wireproto: define frame to represent progress updates...
r37307 FRAME_TYPE_PROGRESS = 0x07
Gregory Szorc
wireprotov2: update stream encoding specification...
r40161 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
FRAME_TYPE_STREAM_SETTINGS = 0x09
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
FRAME_TYPES = {
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 b'command-data': FRAME_TYPE_COMMAND_DATA,
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
Gregory Szorc
wireproto: define human output side channel frame...
r37078 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
Gregory Szorc
wireproto: define frame to represent progress updates...
r37307 b'progress': FRAME_TYPE_PROGRESS,
Gregory Szorc
wireprotov2: update stream encoding specification...
r40161 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 }
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 FLAG_COMMAND_REQUEST_NEW = 0x01
FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 FLAGS_COMMAND_REQUEST = {
b'new': FLAG_COMMAND_REQUEST_NEW,
b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 }
FLAG_COMMAND_DATA_CONTINUATION = 0x01
FLAG_COMMAND_DATA_EOS = 0x02
FLAGS_COMMAND_DATA = {
b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
b'eos': FLAG_COMMAND_DATA_EOS,
}
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
FLAG_COMMAND_RESPONSE_EOS = 0x02
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 FLAGS_COMMAND_RESPONSE = {
b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
b'eos': FLAG_COMMAND_RESPONSE_EOS,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 }
Gregory Szorc
wireprotov2: update stream encoding specification...
r40161 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
FLAGS_SENDER_PROTOCOL_SETTINGS = {
b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
}
FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
FLAGS_STREAM_ENCODING_SETTINGS = {
b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
}
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 # Maps frame types to their available flags.
FRAME_TYPE_FLAGS = {
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 FRAME_TYPE_ERROR_RESPONSE: {},
Gregory Szorc
wireproto: define human output side channel frame...
r37078 FRAME_TYPE_TEXT_OUTPUT: {},
Gregory Szorc
wireproto: define frame to represent progress updates...
r37307 FRAME_TYPE_PROGRESS: {},
Gregory Szorc
wireprotov2: update stream encoding specification...
r40161 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 }
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314 def humanflags(mapping, value):
"""Convert a numeric flags value to a human value, using a mapping table."""
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 namemap = {v: k for k, v in pycompat.iteritems(mapping)}
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314 flags = []
Yuya Nishihara
wireproto: show unknown id and flags in repr(frame)...
r37491 val = 1
while value >= val:
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314 if value & val:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 flags.append(namemap.get(val, b'<unknown 0x%02x>' % val))
Yuya Nishihara
wireproto: show unknown id and flags in repr(frame)...
r37491 val <<= 1
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314
return b'|'.join(flags)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 @attr.s(slots=True)
class frameheader(object):
"""Represents the data in a frame header."""
length = attr.ib()
requestid = attr.ib()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamid = attr.ib()
streamflags = attr.ib()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 typeid = attr.ib()
flags = attr.ib()
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314 @attr.s(slots=True, repr=False)
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 class frame(object):
"""Represents a parsed frame."""
requestid = attr.ib()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamid = attr.ib()
streamflags = attr.ib()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 typeid = attr.ib()
flags = attr.ib()
payload = attr.ib()
Yuya Nishihara
py3: system-stringify repr(frame)...
r37492 @encoding.strmethod
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314 def __repr__(self):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 typename = b'<unknown 0x%02x>' % self.typeid
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 for name, value in pycompat.iteritems(FRAME_TYPES):
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314 if value == self.typeid:
typename = name
break
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
b'type=%s; flags=%s)'
Augie Fackler
formatting: blacken the codebase...
r43346 % (
len(self.payload),
self.requestid,
self.streamid,
humanflags(STREAM_FLAGS, self.streamflags),
typename,
humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
)
)
Gregory Szorc
wireproto: implement custom __repr__ for frame...
r37314
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 """Assemble a frame into a byte array."""
# TODO assert size of payload.
frame = bytearray(FRAME_HEADER_SIZE + len(payload))
Gregory Szorc
wireproto: add request IDs to frames...
r37075 # 24 bits length
# 16 bits request id
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 # 8 bits stream id
# 8 bits stream flags
Gregory Szorc
wireproto: add request IDs to frames...
r37075 # 4 bits type
# 4 bits flags
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 l = struct.pack(r'<I', len(payload))
frame[0:3] = l[0:3]
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
frame[7] = (typeid << 4) | flags
frame[8:] = payload
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
return frame
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 def makeframefromhumanstring(s):
Gregory Szorc
wireproto: add request IDs to frames...
r37075 """Create a frame from a human readable string
Strings have the form:
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
This can be used by user-facing applications and tests for creating
frames easily without having to type out a bunch of constants.
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 Request ID and stream IDs are integers.
Gregory Szorc
wireproto: add request IDs to frames...
r37075
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 Stream flags, frame type, and flags can be specified by integer or
named constant.
Gregory Szorc
wireproto: add request IDs to frames...
r37075
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 Flags can be delimited by `|` to bitwise OR them together.
Gregory Szorc
wireproto: syntax for encoding CBOR into frames...
r37306
If the payload begins with ``cbor:``, the following string will be
Yuya Nishihara
wireproto: convert python literal to object without using unsafe eval()...
r37494 evaluated as Python literal and the resulting object will be fed into
Gregory Szorc
wireproto: syntax for encoding CBOR into frames...
r37306 a CBOR encoder. Otherwise, the payload is interpreted as a Python
byte string literal.
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 """
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 fields = s.split(b' ', 5)
requestid, streamid, streamflags, frametype, frameflags, payload = fields
Gregory Szorc
wireproto: add request IDs to frames...
r37075
requestid = int(requestid)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamid = int(streamid)
finalstreamflags = 0
for flag in streamflags.split(b'|'):
if flag in STREAM_FLAGS:
finalstreamflags |= STREAM_FLAGS[flag]
else:
finalstreamflags |= int(flag)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
if frametype in FRAME_TYPES:
frametype = FRAME_TYPES[frametype]
else:
frametype = int(frametype)
finalflags = 0
validflags = FRAME_TYPE_FLAGS[frametype]
for flag in frameflags.split(b'|'):
if flag in validflags:
finalflags |= validflags[flag]
else:
finalflags |= int(flag)
Gregory Szorc
wireproto: syntax for encoding CBOR into frames...
r37306 if payload.startswith(b'cbor:'):
Augie Fackler
formatting: blacken the codebase...
r43346 payload = b''.join(
cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
)
Gregory Szorc
wireproto: syntax for encoding CBOR into frames...
r37306
else:
payload = stringutil.unescapestr(payload)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Augie Fackler
formatting: blacken the codebase...
r43346 return makeframe(
requestid=requestid,
streamid=streamid,
streamflags=finalstreamflags,
typeid=frametype,
flags=finalflags,
payload=payload,
)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def parseheader(data):
"""Parse a unified framing protocol frame header from a buffer.
The header is expected to be in the buffer at offset 0 and the
buffer is expected to be large enough to hold a full header.
"""
# 24 bits payload length (little endian)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 # 16 bits request ID
# 8 bits stream ID
# 8 bits stream flags
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 # 4 bits frame type
# 4 bits frame flags
# ... payload
framelength = data[0] + 256 * data[1] + 16384 * data[2]
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
typeflags = data[7]
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Augie Fackler
formatting: blacken the codebase...
r43346 frametype = (typeflags & 0xF0) >> 4
frameflags = typeflags & 0x0F
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Augie Fackler
formatting: blacken the codebase...
r43346 return frameheader(
framelength, requestid, streamid, streamflags, frametype, frameflags
)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
def readframe(fh):
"""Read a unified framing protocol frame from a file object.
Returns a 3-tuple of (type, flags, payload) for the decoded frame or
None if no frame is available. May raise if a malformed frame is
seen.
"""
header = bytearray(FRAME_HEADER_SIZE)
readcount = fh.readinto(header)
if readcount == 0:
return None
if readcount != FRAME_HEADER_SIZE:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'received incomplete frame: got %d bytes: %s')
Augie Fackler
formatting: blacken the codebase...
r43346 % (readcount, header)
)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 h = parseheader(header)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 payload = fh.read(h.length)
if len(payload) != h.length:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'frame length error: expected %d; got %d')
Augie Fackler
formatting: blacken the codebase...
r43346 % (h.length, len(payload))
)
return frame(
h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Augie Fackler
formatting: blacken the codebase...
r43346 def createcommandframes(
stream,
requestid,
cmd,
args,
datafh=None,
maxframesize=DEFAULT_MAX_FRAME_SIZE,
redirect=None,
):
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 """Create frames necessary to transmit a request to run a command.
This is a generator of bytearrays. Each item represents a frame
ready to be sent over the wire to a peer.
"""
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 data = {b'name': cmd}
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 if args:
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 data[b'args'] = args
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 if redirect:
data[b'redirect'] = redirect
Gregory Szorc
wireprotoframing: use our CBOR module...
r39477 data = b''.join(cborutil.streamencode(data))
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 offset = 0
while True:
flags = 0
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 # Must set new or continuation flag.
if not offset:
flags |= FLAG_COMMAND_REQUEST_NEW
else:
flags |= FLAG_COMMAND_REQUEST_CONTINUATION
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 # Data frames is set on all frames.
if datafh:
flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Augie Fackler
formatting: blacken the codebase...
r43346 payload = data[offset : offset + maxframesize]
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 offset += len(payload)
if len(payload) == maxframesize and offset < len(data):
flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
Augie Fackler
formatting: blacken the codebase...
r43346 yield stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_COMMAND_REQUEST,
flags=flags,
payload=payload,
)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
break
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069 if datafh:
while True:
data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
done = False
if len(data) == DEFAULT_MAX_FRAME_SIZE:
flags = FLAG_COMMAND_DATA_CONTINUATION
else:
flags = FLAG_COMMAND_DATA_EOS
assert datafh.read(1) == b''
done = True
Augie Fackler
formatting: blacken the codebase...
r43346 yield stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_COMMAND_DATA,
flags=flags,
payload=data,
)
Gregory Szorc
wireproto: define and implement protocol for issuing requests...
r37069
if done:
break
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 def createcommandresponseokframe(stream, requestid):
overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 if stream.streamsettingssent:
overall = stream.encode(overall)
encoded = True
if not overall:
return None
else:
encoded = False
Augie Fackler
formatting: blacken the codebase...
r43346 return stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
payload=overall,
encoded=encoded,
)
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595
Augie Fackler
formatting: blacken the codebase...
r43346
def createcommandresponseeosframes(
stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
):
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 """Create an empty payload frame representing command end-of-stream."""
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 payload = stream.flush()
offset = 0
while True:
Augie Fackler
formatting: blacken the codebase...
r43346 chunk = payload[offset : offset + maxframesize]
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 offset += len(chunk)
done = offset == len(payload)
if done:
flags = FLAG_COMMAND_RESPONSE_EOS
else:
flags = FLAG_COMMAND_RESPONSE_CONTINUATION
Augie Fackler
formatting: blacken the codebase...
r43346 yield stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=flags,
payload=chunk,
encoded=payload != b'',
)
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173
if done:
break
Gregory Szorc
wireprotov2: add support for more response types...
r37746
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061 def createalternatelocationresponseframe(stream, requestid, location):
data = {
b'status': b'redirect',
Augie Fackler
formatting: blacken the codebase...
r43346 b'location': {b'url': location.url, b'mediatype': location.mediatype,},
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061 }
Augie Fackler
formatting: blacken the codebase...
r43346 for a in (
r'size',
r'fullhashes',
r'fullhashseed',
r'serverdercerts',
r'servercadercerts',
):
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061 value = getattr(location, a)
if value is not None:
data[b'location'][pycompat.bytestr(a)] = value
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 payload = b''.join(cborutil.streamencode(data))
if stream.streamsettingssent:
payload = stream.encode(payload)
encoded = True
else:
encoded = False
Augie Fackler
formatting: blacken the codebase...
r43346 return stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
payload=payload,
encoded=encoded,
)
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061
Gregory Szorc
wireprotov2: add support for more response types...
r37746 def createcommanderrorresponse(stream, requestid, message, args=None):
Gregory Szorc
wireprotov2peer: properly format errors...
r39522 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
# formatting works consistently?
Augie Fackler
formatting: blacken the codebase...
r43346 m = {b'status': b'error', b'error': {b'message': message,}}
Gregory Szorc
wireprotov2: add support for more response types...
r37746
if args:
m[b'error'][b'args'] = args
Gregory Szorc
wireprotoframing: use our CBOR module...
r39477 overall = b''.join(cborutil.streamencode(m))
Gregory Szorc
wireprotov2: add support for more response types...
r37746
Augie Fackler
formatting: blacken the codebase...
r43346 yield stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=FLAG_COMMAND_RESPONSE_EOS,
payload=overall,
)
Gregory Szorc
wireprotov2: add support for more response types...
r37746
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 def createerrorframe(stream, requestid, msg, errtype):
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 # TODO properly handle frame size limits.
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
Augie Fackler
formatting: blacken the codebase...
r43346 payload = b''.join(
cborutil.streamencode({b'type': errtype, b'message': [{b'msg': msg}],})
)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Augie Fackler
formatting: blacken the codebase...
r43346 yield stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_ERROR_RESPONSE,
flags=0,
payload=payload,
)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Augie Fackler
formatting: blacken the codebase...
r43346
def createtextoutputframe(
stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
):
Gregory Szorc
wireproto: define human output side channel frame...
r37078 """Create a text output frame to render text to people.
``atoms`` is a 3-tuple of (formatting string, args, labels).
The formatting string contains ``%s`` tokens to be replaced by the
corresponding indexed entry in ``args``. ``labels`` is an iterable of
formatters to be applied at rendering time. In terms of the ``ui``
class, each atom corresponds to a ``ui.write()``.
"""
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 atomdicts = []
Gregory Szorc
wireproto: define human output side channel frame...
r37078
for (formatting, args, labels) in atoms:
# TODO look for localstr, other types here?
if not isinstance(formatting, bytes):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise ValueError(b'must use bytes formatting strings')
Gregory Szorc
wireproto: define human output side channel frame...
r37078 for arg in args:
if not isinstance(arg, bytes):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise ValueError(b'must use bytes for arguments')
Gregory Szorc
wireproto: define human output side channel frame...
r37078 for label in labels:
if not isinstance(label, bytes):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise ValueError(b'must use bytes for labels')
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 # Formatting string must be ASCII.
formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
Gregory Szorc
wireproto: define human output side channel frame...
r37078
# Arguments must be UTF-8.
args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
# Labels must be ASCII.
Augie Fackler
formatting: blacken the codebase...
r43346 labels = [
l.decode(r'ascii', r'strict').encode(r'ascii') for l in labels
]
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 atom = {b'msg': formatting}
if args:
atom[b'args'] = args
if labels:
atom[b'labels'] = labels
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 atomdicts.append(atom)
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireprotoframing: use our CBOR module...
r39477 payload = b''.join(cborutil.streamencode(atomdicts))
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 if len(payload) > maxframesize:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise ValueError(b'cannot encode data in a single frame')
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Augie Fackler
formatting: blacken the codebase...
r43346 yield stream.makeframe(
requestid=requestid,
typeid=FRAME_TYPE_TEXT_OUTPUT,
flags=0,
payload=payload,
)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 class bufferingcommandresponseemitter(object):
"""Helper object to emit command response frames intelligently.
Raw command response data is likely emitted in chunks much smaller
than what can fit in a single frame. This class exists to buffer
chunks until enough data is available to fit in a single frame.
TODO we'll need something like this when compression is supported.
So it might make sense to implement this functionality at the stream
level.
"""
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
self._stream = stream
self._requestid = requestid
self._maxsize = maxframesize
self._chunks = []
self._chunkssize = 0
def send(self, data):
"""Send new data for emission.
Is a generator of new frames that were derived from the new input.
If the special input ``None`` is received, flushes all buffered
data to frames.
"""
if data is None:
for frame in self._flush():
yield frame
return
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 data = self._stream.encode(data)
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 # There is a ton of potential to do more complicated things here.
# Our immediate goal is to coalesce small chunks into big frames,
# not achieve the fewest number of frames possible. So we go with
# a simple implementation:
#
# * If a chunk is too large for a frame, we flush and emit frames
# for the new chunk.
# * If a chunk can be buffered without total buffered size limits
# being exceeded, we do that.
# * If a chunk causes us to go over our buffering limit, we flush
# and then buffer the new chunk.
Gregory Szorc
wireprotov2: don't emit empty frames...
r40171 if not data:
return
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 if len(data) > self._maxsize:
for frame in self._flush():
yield frame
# Now emit frames for the big chunk.
offset = 0
while True:
Augie Fackler
formatting: blacken the codebase...
r43346 chunk = data[offset : offset + self._maxsize]
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 offset += len(chunk)
yield self._stream.makeframe(
self._requestid,
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 payload=chunk,
Augie Fackler
formatting: blacken the codebase...
r43346 encoded=True,
)
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596
if offset == len(data):
return
# If we don't have enough to constitute a full frame, buffer and
# return.
if len(data) + self._chunkssize < self._maxsize:
self._chunks.append(data)
self._chunkssize += len(data)
return
# Else flush what we have and buffer the new chunk. We could do
# something more intelligent here, like break the chunk. Let's
# keep things simple for now.
for frame in self._flush():
yield frame
self._chunks.append(data)
self._chunkssize = len(data)
def _flush(self):
payload = b''.join(self._chunks)
assert len(payload) <= self._maxsize
self._chunks[:] = []
self._chunkssize = 0
Gregory Szorc
wireprotov2: don't emit empty frames...
r40171 if not payload:
return
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 yield self._stream.makeframe(
self._requestid,
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 payload=payload,
Augie Fackler
formatting: blacken the codebase...
r43346 encoded=True,
)
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 # TODO consider defining encoders/decoders using the util.compressionengine
# mechanism.
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class identityencoder(object):
"""Encoder for the "identity" stream encoding profile."""
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 def __init__(self, ui):
pass
def encode(self, data):
return data
def flush(self):
return b''
def finish(self):
return b''
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class identitydecoder(object):
"""Decoder for the "identity" stream encoding profile."""
def __init__(self, ui, extraobjs):
if extraobjs:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 _(b'identity decoder received unexpected additional values')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167
def decode(self, data):
return data
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class zlibencoder(object):
def __init__(self, ui):
import zlib
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 self._zlib = zlib
self._compressor = zlib.compressobj()
def encode(self, data):
return self._compressor.compress(data)
def flush(self):
# Z_SYNC_FLUSH doesn't reset compression context, which is
# what we want.
return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
def finish(self):
res = self._compressor.flush(self._zlib.Z_FINISH)
self._compressor = None
return res
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class zlibdecoder(object):
def __init__(self, ui, extraobjs):
import zlib
if extraobjs:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 _(b'zlib decoder received unexpected additional values')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167
self._decompressor = zlib.decompressobj()
def decode(self, data):
# Python 2's zlib module doesn't use the buffer protocol and can't
# handle all bytes-like types.
if not pycompat.ispy3 and isinstance(data, bytearray):
data = bytes(data)
return self._decompressor.decompress(data)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class zstdbaseencoder(object):
def __init__(self, level):
from . import zstd
self._zstd = zstd
cctx = zstd.ZstdCompressor(level=level)
self._compressor = cctx.compressobj()
def encode(self, data):
return self._compressor.compress(data)
def flush(self):
# COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
# compressor and allows a decompressor to access all encoded data
# up to this point.
return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
def finish(self):
res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
self._compressor = None
return res
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class zstd8mbencoder(zstdbaseencoder):
def __init__(self, ui):
super(zstd8mbencoder, self).__init__(3)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class zstdbasedecoder(object):
def __init__(self, maxwindowsize):
from . import zstd
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
self._decompressor = dctx.decompressobj()
def decode(self, data):
return self._decompressor.decompress(data)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 class zstd8mbdecoder(zstdbasedecoder):
def __init__(self, ui, extraobjs):
if extraobjs:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 _(b'zstd8mb decoder received unexpected additional values')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167
super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 # We lazily populate this to avoid excessive module imports when importing
# this module.
STREAM_ENCODERS = {}
STREAM_ENCODERS_ORDER = []
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 def populatestreamencoders():
if STREAM_ENCODERS:
return
try:
from . import zstd
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 zstd.__version__
except ImportError:
zstd = None
# zstandard is fastest and is preferred.
if zstd:
STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
STREAM_ENCODERS_ORDER.append(b'zlib')
STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
STREAM_ENCODERS_ORDER.append(b'identity')
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 class stream(object):
"""Represents a logical unidirectional series of frames."""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 def __init__(self, streamid, active=False):
self.streamid = streamid
Gregory Szorc
wireprotoframing: use value passed into function...
r37673 self._active = active
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def makeframe(self, requestid, typeid, flags, payload):
"""Create a frame to be sent out over this stream.
Only returns the frame instance. Does not actually send it.
"""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 streamflags = 0
if not self._active:
streamflags |= STREAM_FLAG_BEGIN_STREAM
self._active = True
Augie Fackler
formatting: blacken the codebase...
r43346 return makeframe(
requestid, self.streamid, streamflags, typeid, flags, payload
)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304
Gregory Szorc
wireprotov2: establish dedicated classes for input and output streams...
r40166 class inputstream(stream):
"""Represents a stream used for receiving data."""
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 def __init__(self, streamid, active=False):
super(inputstream, self).__init__(streamid, active=active)
self._decoder = None
def setdecoder(self, ui, name, extraobjs):
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 """Set the decoder for this stream.
Receives the stream profile name and any additional CBOR objects
decoded from the stream encoding settings frame payloads.
"""
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 if name not in STREAM_ENCODERS:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.Abort(_(b'unknown stream decoder: %s') % name)
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167
self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
def decode(self, data):
# Default is identity decoder. We don't bother instantiating one
# because it is trivial.
if not self._decoder:
return data
return self._decoder.decode(data)
def flush(self):
if not self._decoder:
return b''
return self._decoder.flush()
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireprotov2: establish dedicated classes for input and output streams...
r40166 class outputstream(stream):
"""Represents a stream used for sending data."""
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 def __init__(self, streamid, active=False):
super(outputstream, self).__init__(streamid, active=active)
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 self.streamsettingssent = False
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 self._encoder = None
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 self._encodername = None
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167
def setencoder(self, ui, name):
"""Set the encoder for this stream.
Receives the stream profile name.
"""
if name not in STREAM_ENCODERS:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.Abort(_(b'unknown stream encoder: %s') % name)
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167
self._encoder = STREAM_ENCODERS[name][0](ui)
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 self._encodername = name
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167
def encode(self, data):
if not self._encoder:
return data
return self._encoder.encode(data)
def flush(self):
if not self._encoder:
return b''
return self._encoder.flush()
def finish(self):
if not self._encoder:
return b''
self._encoder.finish()
Augie Fackler
formatting: blacken the codebase...
r43346 def makeframe(self, requestid, typeid, flags, payload, encoded=False):
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 """Create a frame to be sent out over this stream.
Only returns the frame instance. Does not actually send it.
"""
streamflags = 0
if not self._active:
streamflags |= STREAM_FLAG_BEGIN_STREAM
self._active = True
if encoded:
if not self.streamsettingssent:
raise error.ProgrammingError(
b'attempting to send encoded frame without sending stream '
Augie Fackler
formatting: blacken the codebase...
r43346 b'settings'
)
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173
streamflags |= STREAM_FLAG_ENCODING_APPLIED
Augie Fackler
formatting: blacken the codebase...
r43346 if (
typeid == FRAME_TYPE_STREAM_SETTINGS
and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
):
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 self.streamsettingssent = True
Augie Fackler
formatting: blacken the codebase...
r43346 return makeframe(
requestid, self.streamid, streamflags, typeid, flags, payload
)
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173
def makestreamsettingsframe(self, requestid):
"""Create a stream settings frame for this stream.
Returns frame data or None if no stream settings frame is needed or has
already been sent.
"""
if not self._encoder or self.streamsettingssent:
return None
payload = b''.join(cborutil.streamencode(self._encodername))
Augie Fackler
formatting: blacken the codebase...
r43346 return self.makeframe(
requestid,
FRAME_TYPE_STREAM_SETTINGS,
FLAG_STREAM_ENCODING_SETTINGS_EOS,
payload,
)
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 def ensureserverstream(stream):
if stream.streamid % 2:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'server should only write to even '
b'numbered streams; %d is not even' % stream.streamid
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 DEFAULT_PROTOCOL_SETTINGS = {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'contentencodings': [b'identity'],
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 }
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
This class is the "brain" of the unified frame-based protocol server
component. While the protocol is stateless from the perspective of
requests/commands, something needs to track which frames have been
received, what frames to expect, etc. This class is that thing.
Instances are modeled as a state machine of sorts. Instances are also
reactionary to external events. The point of this class is to encapsulate
the state of the connection and the exchange of frames, not to perform
work. Instead, callers tell this class when something occurs, like a
frame arriving. If that activity is worthy of a follow-up action (say
*run a command*), the return value of that handler will say so.
I/O and CPU intensive operations are purposefully delegated outside of
this class.
Consumers are expected to tell instances when events occur. They do so by
calling the various ``on*`` methods. These methods return a 2-tuple
describing any follow-up action(s) to take. The first element is the
name of an action to perform. The second is a data structure (usually
a dict) specific to that action that contains more information. e.g.
if the server wants to send frames back to the client, the data structure
will contain a reference to those frames.
Valid actions that consumers can be instructed to take are:
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 sendframes
Indicates that frames should be sent to the client. The ``framegen``
key contains a generator of frames that should be sent. The server
assumes that all frames are sent to the client.
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 error
Indicates that an error occurred. Consumer should probably abort.
runcommand
Indicates that the consumer should run a wire protocol command. Details
of the command to run are given in the data structure.
wantframe
Indicates that nothing of interest happened and the server is waiting on
more frames from the client before anything interesting can be done.
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
noop
Indicates no additional action is required.
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Known Issues
------------
There are no limits to the number of partially received commands or their
size. A malicious client could stream command request data and exhaust the
server's memory.
Partially received commands are not acted upon when end of input is
reached. Should the server error if it receives a partial request?
Should the client send a message to abort a partially transmitted request
to facilitate graceful shutdown?
Active requests that haven't been responded to aren't tracked. This means
that if we receive a command and instruct its dispatch, another command
with its request ID can come in over the wire and there will be a race
between who responds to what.
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 """
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 def __init__(self, ui, deferoutput=False):
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 """Construct a new server reactor.
``deferoutput`` can be used to indicate that no output frames should be
instructed to be sent until input has been exhausted. In this mode,
events that would normally generate output frames (such as a command
response being ready) will instead defer instructing the consumer to
send those frames. This is useful for half-duplex transports where the
sender cannot receive until all data has been transmitted.
"""
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 self._ui = ui
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self._deferoutput = deferoutput
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'initial'
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 self._nextoutgoingstreamid = 2
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self._bufferedframegens = []
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 # stream id -> stream instance for all active streams from the client.
self._incomingstreams = {}
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 self._outgoingstreams = {}
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # request id -> dict of commands that are actively being received.
self._receivingcommands = {}
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 # Request IDs that have been received and are actively being processed.
# Once all output for a request has been sent, it is removed from this
# set.
self._activecommands = set()
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 self._protocolsettingsdecoder = None
# Sender protocol settings are optional. Set implied default values.
self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 populatestreamencoders()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def onframerecv(self, frame):
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 """Process a frame that has been received off the wire.
Returns a dict with an ``action`` key that details what action,
if any, the consumer should take next.
"""
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 if not frame.streamid % 2:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'received frame with even numbered stream ID: %d')
Augie Fackler
formatting: blacken the codebase...
r43346 % frame.streamid
)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304
if frame.streamid not in self._incomingstreams:
if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'received frame on unknown inactive stream without '
b'beginning of stream flag set'
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304
Gregory Szorc
wireprotov2: establish dedicated classes for input and output streams...
r40166 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304
if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
# TODO handle decoding frames
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'support for decoding stream payloads not yet implemented'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304
if frame.streamflags & STREAM_FLAG_END_STREAM:
del self._incomingstreams[frame.streamid]
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 handlers = {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'initial': self._onframeinitial,
b'protocol-settings-receiving': self._onframeprotocolsettings,
b'idle': self._onframeidle,
b'command-receiving': self._onframecommandreceiving,
b'errored': self._onframeerrored,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 }
meth = handlers.get(self._state)
if not meth:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.ProgrammingError(b'unhandled state: %s' % self._state)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return meth(frame)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 def oncommandresponsereadyobjects(self, stream, requestid, objs):
"""Signal that objects are ready to be sent to the client.
``objs`` is an iterable of objects (typically a generator) that will
be encoded via CBOR and added to frames, which will be sent to the
client.
"""
Gregory Szorc
wireprotov2: add support for more response types...
r37746 ensureserverstream(stream)
Gregory Szorc
wireprotov2: remove functions for creating response frames from bytes...
r40170 # A more robust solution would be to check for objs.{next,__next__}.
if isinstance(objs, list):
objs = iter(objs)
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 # We need to take care over exception handling. Uncaught exceptions
# when generating frames could lead to premature end of the frame
# stream and the possibility of the server or client process getting
# in a bad state.
#
# Keep in mind that if ``objs`` is a generator, advancing it could
# raise exceptions that originated in e.g. wire protocol command
# functions. That is why we differentiate between exceptions raised
# when iterating versus other exceptions that occur.
#
# In all cases, when the function finishes, the request is fully
# handled and no new frames for it should be seen.
Gregory Szorc
wireprotov2: add support for more response types...
r37746 def sendframes():
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 emitted = False
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061 alternatelocationsent = False
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 emitter = bufferingcommandresponseemitter(stream, requestid)
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 while True:
try:
o = next(objs)
except StopIteration:
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 for frame in emitter.send(None):
yield frame
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 if emitted:
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 for frame in createcommandresponseeosframes(
Augie Fackler
formatting: blacken the codebase...
r43346 stream, requestid
):
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 yield frame
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 break
except error.WireprotoCommandError as e:
for frame in createcommanderrorresponse(
Augie Fackler
formatting: blacken the codebase...
r43346 stream, requestid, e.message, e.messageargs
):
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 yield frame
break
except Exception as e:
Gregory Szorc
py3: cast exception to bytes...
r39870 for frame in createerrorframe(
Augie Fackler
formatting: blacken the codebase...
r43346 stream,
requestid,
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'%s' % stringutil.forcebytestr(e),
errtype=b'server',
Augie Fackler
formatting: blacken the codebase...
r43346 ):
Gregory Szorc
py3: cast exception to bytes...
r39870
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 yield frame
break
try:
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061 # Alternate location responses can only be the first and
# only object in the output stream.
if isinstance(o, wireprototypes.alternatelocationresponse):
if emitted:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'alternatelocationresponse seen after initial '
b'output object'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 frame = stream.makestreamsettingsframe(requestid)
if frame:
yield frame
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061 yield createalternatelocationresponseframe(
Augie Fackler
formatting: blacken the codebase...
r43346 stream, requestid, o
)
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061
alternatelocationsent = True
emitted = True
continue
if alternatelocationsent:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'object follows alternatelocationresponse'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireprotov2: server support for sending content redirects...
r40061
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 if not emitted:
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 # Frame is optional.
frame = stream.makestreamsettingsframe(requestid)
if frame:
yield frame
# May be None if empty frame (due to encoding).
frame = createcommandresponseokframe(stream, requestid)
if frame:
yield frame
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 emitted = True
Gregory Szorc
wireprotov2: define type to represent pre-encoded object...
r40056 # Objects emitted by command functions can be serializable
# data structures or special types.
# TODO consider extracting the content normalization to a
# standalone function, as it may be useful for e.g. cachers.
# A pre-encoded object is sent directly to the emitter.
if isinstance(o, wireprototypes.encodedresponse):
for frame in emitter.send(o.data):
Gregory Szorc
wireprotoframing: buffer emitted data to reduce frame count...
r39596 yield frame
Gregory Szorc
wireprotov2: add response type that serializes to indefinite length bytestring...
r40364 elif isinstance(
Augie Fackler
formatting: blacken the codebase...
r43346 o, wireprototypes.indefinitebytestringresponse
):
Gregory Szorc
wireprotov2: add response type that serializes to indefinite length bytestring...
r40364 for chunk in cborutil.streamencodebytestringfromiter(
Augie Fackler
formatting: blacken the codebase...
r43346 o.chunks
):
Gregory Szorc
wireprotov2: add response type that serializes to indefinite length bytestring...
r40364
for frame in emitter.send(chunk):
yield frame
Gregory Szorc
wireprotov2: define type to represent pre-encoded object...
r40056 # A regular object is CBOR encoded.
else:
for chunk in cborutil.streamencode(o):
for frame in emitter.send(chunk):
yield frame
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 except Exception as e:
Augie Fackler
formatting: blacken the codebase...
r43346 for frame in createerrorframe(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 stream, requestid, b'%s' % e, errtype=b'server'
Augie Fackler
formatting: blacken the codebase...
r43346 ):
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 yield frame
break
Gregory Szorc
wireprotov2: add support for more response types...
r37746
self._activecommands.remove(requestid)
return self._handlesendframes(sendframes())
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 def oninputeof(self):
"""Signals that end of input has been received.
No more frames will be received. All pending activity should be
completed.
"""
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # TODO should we do anything about in-flight commands?
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 if not self._deferoutput or not self._bufferedframegens:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'noop', {}
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
# If we buffered all our responses, emit those.
def makegen():
for gen in self._bufferedframegens:
for frame in gen:
yield frame
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'sendframes', {b'framegen': makegen(),}
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireprotov2: add support for more response types...
r37746 def _handlesendframes(self, framegen):
if self._deferoutput:
self._bufferedframegens.append(framegen)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'noop', {}
Gregory Szorc
wireprotov2: add support for more response types...
r37746 else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'sendframes', {b'framegen': framegen,}
Gregory Szorc
wireprotov2: add support for more response types...
r37746
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 def onservererror(self, stream, requestid, msg):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ensureserverstream(stream)
Gregory Szorc
wireprotov2: add support for more response types...
r37746 def sendframes():
Augie Fackler
formatting: blacken the codebase...
r43346 for frame in createerrorframe(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 stream, requestid, msg, errtype=b'server'
Augie Fackler
formatting: blacken the codebase...
r43346 ):
Gregory Szorc
wireprotov2: add support for more response types...
r37746 yield frame
self._activecommands.remove(requestid)
return self._handlesendframes(sendframes())
def oncommanderror(self, stream, requestid, message, args=None):
"""Called when a command encountered an error before sending output."""
ensureserverstream(stream)
def sendframes():
Augie Fackler
formatting: blacken the codebase...
r43346 for frame in createcommanderrorresponse(
stream, requestid, message, args
):
Gregory Szorc
wireprotov2: add support for more response types...
r37746 yield frame
self._activecommands.remove(requestid)
return self._handlesendframes(sendframes())
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 def makeoutputstream(self):
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 """Create a stream to be used for sending data to the client.
If this is called before protocol settings frames are received, we
don't know what stream encodings are supported by the client and
we will default to identity.
"""
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 streamid = self._nextoutgoingstreamid
self._nextoutgoingstreamid += 2
Gregory Szorc
wireprotov2: establish dedicated classes for input and output streams...
r40166 s = outputstream(streamid)
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 self._outgoingstreams[streamid] = s
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 # Always use the *server's* preferred encoder over the client's,
# as servers have more to lose from sub-optimal encoders being used.
for name in STREAM_ENCODERS_ORDER:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if name in self._sendersettings[b'contentencodings']:
Gregory Szorc
wireprotov2: send content encoded frames from server...
r40173 s.setencoder(self._ui, name)
break
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 return s
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def _makeerrorresult(self, msg):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'error', {b'message': msg,}
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 def _makeruncommandresult(self, requestid):
entry = self._receivingcommands[requestid]
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not entry[b'requestdone']:
self._state = b'errored'
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'should not be called without requestdone set'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 del self._receivingcommands[requestid]
if self._receivingcommands:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'command-receiving'
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'idle'
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 # Decode the payloads as CBOR.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 entry[b'payload'].seek(0)
request = cborutil.decodeall(entry[b'payload'].getvalue())[0]
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
if b'name' not in request:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'command request missing "name" field')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
if b'args' not in request:
request[b'args'] = {}
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 assert requestid not in self._activecommands
self._activecommands.add(requestid)
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'runcommand',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'requestid': requestid,
b'command': request[b'name'],
b'args': request[b'args'],
b'redirect': request.get(b'redirect'),
b'data': entry[b'data'].getvalue() if entry[b'data'] else None,
Augie Fackler
formatting: blacken the codebase...
r43346 },
)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
def _makewantframeresult(self):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'wantframe', {b'state': self._state,}
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 def _validatecommandrequestframe(self, frame):
new = frame.flags & FLAG_COMMAND_REQUEST_NEW
continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
if new and continuation:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'received command request frame with both new and '
b'continuation flags set'
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
if not new and not continuation:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'received command request frame with neither new nor '
b'continuation flags set'
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 def _onframeinitial(self, frame):
# Called when we receive a frame when in the "initial" state.
if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'protocol-settings-receiving'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
return self._onframeprotocolsettings(frame)
elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'idle'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._onframeidle(frame)
else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'expected sender protocol settings or command request '
b'frame; got %d'
Augie Fackler
formatting: blacken the codebase...
r43346 )
% frame.typeid
)
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
def _onframeprotocolsettings(self, frame):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 assert self._state == b'protocol-settings-receiving'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 assert self._protocolsettingsdecoder is not None
if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'expected sender protocol settings frame; got %d')
Augie Fackler
formatting: blacken the codebase...
r43346 % frame.typeid
)
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
if more and eos:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'sender protocol settings frame cannot have both '
b'continuation and end of stream flags set'
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
if not more and not eos:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'sender protocol settings frame must have continuation or '
b'end of stream flag set'
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
# TODO establish limits for maximum amount of data that can be
# buffered.
try:
self._protocolsettingsdecoder.decode(frame.payload)
except Exception as e:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(
b'error decoding CBOR from sender protocol settings frame: %s'
)
Augie Fackler
formatting: blacken the codebase...
r43346 % stringutil.forcebytestr(e)
)
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
if more:
return self._makewantframeresult()
assert eos
decoded = self._protocolsettingsdecoder.getavailable()
self._protocolsettingsdecoder = None
if not decoded:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'sender protocol settings frame did not contain CBOR data')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 elif len(decoded) > 1:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'sender protocol settings frame contained multiple CBOR '
b'values'
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
d = decoded[0]
if b'contentencodings' in d:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._sendersettings[b'contentencodings'] = d[b'contentencodings']
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'idle'
Gregory Szorc
wireprotov2: handle sender protocol settings frames...
r40162
return self._makewantframeresult()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _onframeidle(self, frame):
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 # The only frame type that should be received in this state is a
# command request.
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'expected command request frame; got %d') % frame.typeid
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
res = self._validatecommandrequestframe(frame)
if res:
return res
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.requestid in self._receivingcommands:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'request with ID %d already received') % frame.requestid
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 if frame.requestid in self._activecommands:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'request with ID %d is already active') % frame.requestid
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
new = frame.flags & FLAG_COMMAND_REQUEST_NEW
moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 if not new:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'received command request frame without new flag set')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
payload = util.bytesio()
payload.write(frame.payload)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 self._receivingcommands[frame.requestid] = {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'payload': payload,
b'data': None,
b'requestdone': not moreframes,
b'expectingdata': bool(expectingdata),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 }
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 # This is the final frame for this request. Dispatch it.
if not moreframes and not expectingdata:
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return self._makeruncommandresult(frame.requestid)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 assert moreframes or expectingdata
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'command-receiving'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makewantframeresult()
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _onframecommandreceiving(self, frame):
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
# Process new command requests as such.
if frame.flags & FLAG_COMMAND_REQUEST_NEW:
return self._onframeidle(frame)
res = self._validatecommandrequestframe(frame)
if res:
return res
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
# All other frames should be related to a command that is currently
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 # receiving but is not active.
if frame.requestid in self._activecommands:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'received frame for request that is still active: %d')
Augie Fackler
formatting: blacken the codebase...
r43346 % frame.requestid
)
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.requestid not in self._receivingcommands:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'received frame for request that is not receiving: %d')
Augie Fackler
formatting: blacken the codebase...
r43346 % frame.requestid
)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 entry = self._receivingcommands[frame.requestid]
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if entry[b'requestdone']:
self._state = b'errored'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makeerrorresult(
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'received command request frame when request frames '
b'were supposedly done'
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if expectingdata != entry[b'expectingdata']:
self._state = b'errored'
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'mismatch between expect data flag and previous frame')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 entry[b'payload'].write(frame.payload)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 if not moreframes:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 entry[b'requestdone'] = True
Gregory Szorc
wireproto: use CBOR for command requests...
r37308
if not moreframes and not expectingdata:
return self._makeruncommandresult(frame.requestid)
return self._makewantframeresult()
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not entry[b'expectingdata']:
self._state = b'errored'
Augie Fackler
formatting: blacken the codebase...
r43346 return self._makeerrorresult(
_(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'received command data frame for request that is not '
b'expecting data: %d'
Augie Fackler
formatting: blacken the codebase...
r43346 )
% frame.requestid
)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if entry[b'data'] is None:
entry[b'data'] = util.bytesio()
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return self._handlecommanddataframe(frame, entry)
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Augie Fackler
formatting: blacken the codebase...
r43346 return self._makeerrorresult(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'received unexpected frame type: %d') % frame.typeid
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _handlecommanddataframe(self, frame, entry):
assert frame.typeid == FRAME_TYPE_COMMAND_DATA
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
# TODO support streaming data instead of buffering it.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 entry[b'data'].write(frame.payload)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 return self._makewantframeresult()
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 elif frame.flags & FLAG_COMMAND_DATA_EOS:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 entry[b'data'].seek(0)
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 return self._makeruncommandresult(frame.requestid)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._state = b'errored'
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 return self._makeerrorresult(_(b'command data frame without flags'))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 def _onframeerrored(self, frame):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return self._makeerrorresult(_(b'server already errored'))
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 class commandrequest(object):
"""Represents a request to run a command."""
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 def __init__(self, requestid, name, args, datafh=None, redirect=None):
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 self.requestid = requestid
self.name = name
self.args = args
self.datafh = datafh
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 self.redirect = redirect
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.state = b'pending'
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 class clientreactor(object):
"""Holds state of a client issuing frame-based protocol requests.
This is like ``serverreactor`` but for client-side state.
Each instance is bound to the lifetime of a connection. For persistent
connection transports using e.g. TCP sockets and speaking the raw
framing protocol, there will be a single instance for the lifetime of
the TCP socket. For transports where there are multiple discrete
interactions (say tunneled within in HTTP request), there will be a
separate instance for each distinct interaction.
Gregory Szorc
wireprotov2: document client reactor actions...
r40163
Consumers are expected to tell instances when events occur by calling
various methods. These methods return a 2-tuple describing any follow-up
action(s) to take. The first element is the name of an action to
perform. The second is a data structure (usually a dict) specific to
that action that contains more information. e.g. if the reactor wants
to send frames to the server, the data structure will contain a reference
to those frames.
Valid actions that consumers can be instructed to take are:
noop
Indicates no additional action is required.
sendframes
Indicates that frames should be sent to the server. The ``framegen``
key contains a generator of frames that should be sent. The reactor
assumes that all frames in this generator are sent to the server.
error
Indicates that an error occurred. The ``message`` key contains an
error message describing the failure.
responsedata
Indicates a response to a previously-issued command was received.
The ``request`` key contains the ``commandrequest`` instance that
represents the request this data is for.
The ``data`` key contains the decoded data from the server.
``expectmore`` and ``eos`` evaluate to True when more response data
is expected to follow or we're at the end of the response stream,
respectively.
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 """
Augie Fackler
formatting: blacken the codebase...
r43346
def __init__(
self,
ui,
hasmultiplesend=False,
buffersends=True,
clientcontentencoders=None,
):
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 """Create a new instance.
``hasmultiplesend`` indicates whether multiple sends are supported
by the transport. When True, it is possible to send commands immediately
instead of buffering until the caller signals an intent to finish a
send operation.
``buffercommands`` indicates whether sends should be buffered until the
last request has been issued.
Gregory Szorc
wireprotov2: send protocol settings frame from client...
r40168
``clientcontentencoders`` is an iterable of content encoders the client
will advertise to the server and that the server can use for encoding
data. If not defined, the client will not advertise content encoders
to the server.
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 """
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 self._ui = ui
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 self._hasmultiplesend = hasmultiplesend
self._buffersends = buffersends
Gregory Szorc
wireprotov2: send protocol settings frame from client...
r40168 self._clientcontentencoders = clientcontentencoders
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
self._canissuecommands = True
self._cansend = True
Gregory Szorc
wireprotov2: send protocol settings frame from client...
r40168 self._protocolsettingssent = False
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
self._nextrequestid = 1
# We only support a single outgoing stream for now.
Gregory Szorc
wireprotov2: establish dedicated classes for input and output streams...
r40166 self._outgoingstream = outputstream(1)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 self._pendingrequests = collections.deque()
self._activerequests = {}
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 self._incomingstreams = {}
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 self._streamsettingsdecoders = {}
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 populatestreamencoders()
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 def callcommand(self, name, args, datafh=None, redirect=None):
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 """Request that a command be executed.
Receives the command name, a dict of arguments to pass to the command,
and an optional file object containing the raw data for the command.
Returns a 3-tuple of (request, action, action data).
"""
if not self._canissuecommands:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.ProgrammingError(b'cannot issue new commands')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
requestid = self._nextrequestid
self._nextrequestid += 2
Augie Fackler
formatting: blacken the codebase...
r43346 request = commandrequest(
requestid, name, args, datafh=datafh, redirect=redirect
)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
if self._buffersends:
self._pendingrequests.append(request)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return request, b'noop', {}
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 else:
if not self._cansend:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'sends cannot be performed on this instance'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
if not self._hasmultiplesend:
self._cansend = False
self._canissuecommands = False
Augie Fackler
formatting: blacken the codebase...
r43346 return (
request,
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'sendframes',
{b'framegen': self._makecommandframes(request),},
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
def flushcommands(self):
"""Request that all queued commands be sent.
If any commands are buffered, this will instruct the caller to send
them over the wire. If no commands are buffered it instructs the client
to no-op.
If instances aren't configured for multiple sends, no new command
requests are allowed after this is called.
"""
if not self._pendingrequests:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'noop', {}
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
if not self._cansend:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'sends cannot be performed on this instance'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
# If the instance only allows sending once, mark that we have fired
# our one shot.
if not self._hasmultiplesend:
self._canissuecommands = False
self._cansend = False
def makeframes():
while self._pendingrequests:
request = self._pendingrequests.popleft()
for frame in self._makecommandframes(request):
yield frame
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'sendframes', {b'framegen': makeframes(),}
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
def _makecommandframes(self, request):
"""Emit frames to issue a command request.
As a side-effect, update request accounting to reflect its changed
state.
"""
self._activerequests[request.requestid] = request
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request.state = b'sending'
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Gregory Szorc
wireprotov2: send protocol settings frame from client...
r40168 if not self._protocolsettingssent and self._clientcontentencoders:
self._protocolsettingssent = True
Augie Fackler
formatting: blacken the codebase...
r43346 payload = b''.join(
cborutil.streamencode(
{b'contentencodings': self._clientcontentencoders,}
)
)
Gregory Szorc
wireprotov2: send protocol settings frame from client...
r40168
yield self._outgoingstream.makeframe(
requestid=request.requestid,
typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
Augie Fackler
formatting: blacken the codebase...
r43346 payload=payload,
)
Gregory Szorc
wireprotov2: send protocol settings frame from client...
r40168
Augie Fackler
formatting: blacken the codebase...
r43346 res = createcommandframes(
self._outgoingstream,
request.requestid,
request.name,
request.args,
datafh=request.datafh,
redirect=request.redirect,
)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
for frame in res:
yield frame
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request.state = b'sent'
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
def onframerecv(self, frame):
"""Process a frame that has been received off the wire.
Returns a 2-tuple of (action, meta) describing further action the
caller needs to take as a result of receiving this frame.
"""
if frame.streamid % 2:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': (
_(b'received frame with odd numbered stream ID: %d')
Augie Fackler
formatting: blacken the codebase...
r43346 % frame.streamid
),
},
)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
if frame.streamid not in self._incomingstreams:
if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': _(
b'received frame on unknown stream '
b'without beginning of stream flag set'
Augie Fackler
formatting: blacken the codebase...
r43346 ),
},
)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
Augie Fackler
formatting: blacken the codebase...
r43346 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 stream = self._incomingstreams[frame.streamid]
# If the payload is encoded, ask the stream to decode it. We
# merely substitute the decoded result into the frame payload as
# if it had been transferred all along.
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 frame.payload = stream.decode(frame.payload)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
if frame.streamflags & STREAM_FLAG_END_STREAM:
del self._incomingstreams[frame.streamid]
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
return self._onstreamsettingsframe(frame)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 if frame.requestid not in self._activerequests:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': (
_(b'received frame for inactive request ID: %d')
Augie Fackler
formatting: blacken the codebase...
r43346 % frame.requestid
),
},
)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
request = self._activerequests[frame.requestid]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request.state = b'receiving'
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
handlers = {
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 }
meth = handlers.get(frame.typeid)
if not meth:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'unhandled frame type: %d' % frame.typeid
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
return meth(request, frame)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 def _onstreamsettingsframe(self, frame):
assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
if more and eos:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': (
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'stream encoding settings frame cannot have both '
b'continuation and end of stream flags set'
Augie Fackler
formatting: blacken the codebase...
r43346 )
),
},
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
if not more and not eos:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': _(
b'stream encoding settings frame must have '
b'continuation or end of stream flag set'
Augie Fackler
formatting: blacken the codebase...
r43346 ),
},
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
if frame.streamid not in self._streamsettingsdecoders:
decoder = cborutil.bufferingdecoder()
self._streamsettingsdecoders[frame.streamid] = decoder
decoder = self._streamsettingsdecoders[frame.streamid]
try:
decoder.decode(frame.payload)
except Exception as e:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': (
Augie Fackler
formatting: blacken the codebase...
r43346 _(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error decoding CBOR from stream encoding '
b'settings frame: %s'
Augie Fackler
formatting: blacken the codebase...
r43346 )
% stringutil.forcebytestr(e)
),
},
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
if more:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'noop', {}
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
assert eos
decoded = decoder.getavailable()
del self._streamsettingsdecoders[frame.streamid]
if not decoded:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': _(
b'stream encoding settings frame did not contain '
b'CBOR data'
Augie Fackler
formatting: blacken the codebase...
r43346 ),
},
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
try:
Augie Fackler
formatting: blacken the codebase...
r43346 self._incomingstreams[frame.streamid].setdecoder(
self._ui, decoded[0], decoded[1:]
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 except Exception as e:
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'message': (
_(b'error setting stream decoder: %s')
Augie Fackler
formatting: blacken the codebase...
r43346 % stringutil.forcebytestr(e)
),
},
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'noop', {}
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 def _oncommandresponseframe(self, request, frame):
if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request.state = b'received'
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 del self._activerequests[request.requestid]
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'responsedata',
Augie Fackler
formatting: blacken the codebase...
r43346 {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'request': request,
b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
b'data': frame.payload,
Augie Fackler
formatting: blacken the codebase...
r43346 },
)
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744
def _onerrorresponseframe(self, request, frame):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 request.state = b'errored'
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 del self._activerequests[request.requestid]
# The payload should be a CBOR map.
Gregory Szorc
wireprotoframing: use our CBOR module...
r39477 m = cborutil.decodeall(frame.payload)[0]
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744
Augie Fackler
formatting: blacken the codebase...
r43346 return (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'error',
{
b'request': request,
b'type': m[b'type'],
b'message': m[b'message'],
},
Augie Fackler
formatting: blacken the codebase...
r43346 )