# wireprotoframing.py - unified framing protocol for wire protocol # # Copyright 2018 Gregory Szorc # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. # This file contains functionality to support the unified frame-based wire # protocol. For details about the protocol, see # `hg help internals.wireprotocol`. from __future__ import absolute_import import struct from .i18n import _ from .thirdparty import ( attr, ) from . import ( error, util, ) from .utils import ( stringutil, ) FRAME_HEADER_SIZE = 6 DEFAULT_MAX_FRAME_SIZE = 32768 FRAME_TYPE_COMMAND_NAME = 0x01 FRAME_TYPE_COMMAND_ARGUMENT = 0x02 FRAME_TYPE_COMMAND_DATA = 0x03 FRAME_TYPE_BYTES_RESPONSE = 0x04 FRAME_TYPE_ERROR_RESPONSE = 0x05 FRAME_TYPE_TEXT_OUTPUT = 0x06 FRAME_TYPES = { b'command-name': FRAME_TYPE_COMMAND_NAME, b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT, b'command-data': FRAME_TYPE_COMMAND_DATA, b'bytes-response': FRAME_TYPE_BYTES_RESPONSE, b'error-response': FRAME_TYPE_ERROR_RESPONSE, b'text-output': FRAME_TYPE_TEXT_OUTPUT, } FLAG_COMMAND_NAME_EOS = 0x01 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02 FLAG_COMMAND_NAME_HAVE_DATA = 0x04 FLAGS_COMMAND = { b'eos': FLAG_COMMAND_NAME_EOS, b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS, b'have-data': FLAG_COMMAND_NAME_HAVE_DATA, } FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01 FLAG_COMMAND_ARGUMENT_EOA = 0x02 FLAGS_COMMAND_ARGUMENT = { b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION, b'eoa': FLAG_COMMAND_ARGUMENT_EOA, } FLAG_COMMAND_DATA_CONTINUATION = 0x01 FLAG_COMMAND_DATA_EOS = 0x02 FLAGS_COMMAND_DATA = { b'continuation': FLAG_COMMAND_DATA_CONTINUATION, b'eos': FLAG_COMMAND_DATA_EOS, } FLAG_BYTES_RESPONSE_CONTINUATION = 0x01 FLAG_BYTES_RESPONSE_EOS = 0x02 FLAGS_BYTES_RESPONSE = { b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION, b'eos': FLAG_BYTES_RESPONSE_EOS, } FLAG_ERROR_RESPONSE_PROTOCOL = 0x01 FLAG_ERROR_RESPONSE_APPLICATION = 0x02 FLAGS_ERROR_RESPONSE = { b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL, b'application': FLAG_ERROR_RESPONSE_APPLICATION, } # Maps frame types to their available flags. FRAME_TYPE_FLAGS = { FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND, FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT, FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA, FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE, FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE, FRAME_TYPE_TEXT_OUTPUT: {}, } ARGUMENT_FRAME_HEADER = struct.Struct(r' This can be used by user-facing applications and tests for creating frames easily without having to type out a bunch of constants. Request ID is an integer. Frame type and flags can be specified by integer or named constant. Flags can be delimited by `|` to bitwise OR them together. """ requestid, frametype, frameflags, payload = s.split(b' ', 3) requestid = int(requestid) if frametype in FRAME_TYPES: frametype = FRAME_TYPES[frametype] else: frametype = int(frametype) finalflags = 0 validflags = FRAME_TYPE_FLAGS[frametype] for flag in frameflags.split(b'|'): if flag in validflags: finalflags |= validflags[flag] else: finalflags |= int(flag) payload = stringutil.unescapestr(payload) return makeframe(requestid=requestid, typeid=frametype, flags=finalflags, payload=payload) def parseheader(data): """Parse a unified framing protocol frame header from a buffer. The header is expected to be in the buffer at offset 0 and the buffer is expected to be large enough to hold a full header. """ # 24 bits payload length (little endian) # 4 bits frame type # 4 bits frame flags # ... payload framelength = data[0] + 256 * data[1] + 16384 * data[2] requestid = struct.unpack_from(r'> 4 frameflags = typeflags & 0x0f return frameheader(framelength, requestid, frametype, frameflags) def readframe(fh): """Read a unified framing protocol frame from a file object. Returns a 3-tuple of (type, flags, payload) for the decoded frame or None if no frame is available. May raise if a malformed frame is seen. """ header = bytearray(FRAME_HEADER_SIZE) readcount = fh.readinto(header) if readcount == 0: return None if readcount != FRAME_HEADER_SIZE: raise error.Abort(_('received incomplete frame: got %d bytes: %s') % (readcount, header)) h = parseheader(header) payload = fh.read(h.length) if len(payload) != h.length: raise error.Abort(_('frame length error: expected %d; got %d') % (h.length, len(payload))) return frame(h.requestid, h.typeid, h.flags, payload) def createcommandframes(requestid, cmd, args, datafh=None): """Create frames necessary to transmit a request to run a command. This is a generator of bytearrays. Each item represents a frame ready to be sent over the wire to a peer. """ flags = 0 if args: flags |= FLAG_COMMAND_NAME_HAVE_ARGS if datafh: flags |= FLAG_COMMAND_NAME_HAVE_DATA if not flags: flags |= FLAG_COMMAND_NAME_EOS yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME, flags=flags, payload=cmd) for i, k in enumerate(sorted(args)): v = args[k] last = i == len(args) - 1 # TODO handle splitting of argument values across frames. payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v)) offset = 0 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v)) offset += ARGUMENT_FRAME_HEADER.size payload[offset:offset + len(k)] = k offset += len(k) payload[offset:offset + len(v)] = v flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0 yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_ARGUMENT, flags=flags, payload=payload) if datafh: while True: data = datafh.read(DEFAULT_MAX_FRAME_SIZE) done = False if len(data) == DEFAULT_MAX_FRAME_SIZE: flags = FLAG_COMMAND_DATA_CONTINUATION else: flags = FLAG_COMMAND_DATA_EOS assert datafh.read(1) == b'' done = True yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_DATA, flags=flags, payload=data) if done: break def createbytesresponseframesfrombytes(requestid, data, maxframesize=DEFAULT_MAX_FRAME_SIZE): """Create a raw frame to send a bytes response from static bytes input. Returns a generator of bytearrays. """ # Simple case of a single frame. if len(data) <= maxframesize: yield makeframe(requestid=requestid, typeid=FRAME_TYPE_BYTES_RESPONSE, flags=FLAG_BYTES_RESPONSE_EOS, payload=data) return offset = 0 while True: chunk = data[offset:offset + maxframesize] offset += len(chunk) done = offset == len(data) if done: flags = FLAG_BYTES_RESPONSE_EOS else: flags = FLAG_BYTES_RESPONSE_CONTINUATION yield makeframe(requestid=requestid, typeid=FRAME_TYPE_BYTES_RESPONSE, flags=flags, payload=chunk) if done: break def createerrorframe(requestid, msg, protocol=False, application=False): # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE flags = 0 if protocol: flags |= FLAG_ERROR_RESPONSE_PROTOCOL if application: flags |= FLAG_ERROR_RESPONSE_APPLICATION yield makeframe(requestid=requestid, typeid=FRAME_TYPE_ERROR_RESPONSE, flags=flags, payload=msg) def createtextoutputframe(requestid, atoms): """Create a text output frame to render text to people. ``atoms`` is a 3-tuple of (formatting string, args, labels). The formatting string contains ``%s`` tokens to be replaced by the corresponding indexed entry in ``args``. ``labels`` is an iterable of formatters to be applied at rendering time. In terms of the ``ui`` class, each atom corresponds to a ``ui.write()``. """ bytesleft = DEFAULT_MAX_FRAME_SIZE atomchunks = [] for (formatting, args, labels) in atoms: if len(args) > 255: raise ValueError('cannot use more than 255 formatting arguments') if len(labels) > 255: raise ValueError('cannot use more than 255 labels') # TODO look for localstr, other types here? if not isinstance(formatting, bytes): raise ValueError('must use bytes formatting strings') for arg in args: if not isinstance(arg, bytes): raise ValueError('must use bytes for arguments') for label in labels: if not isinstance(label, bytes): raise ValueError('must use bytes for labels') # Formatting string must be UTF-8. formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8') # Arguments must be UTF-8. args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args] # Labels must be ASCII. labels = [l.decode(r'ascii', r'strict').encode(r'ascii') for l in labels] if len(formatting) > 65535: raise ValueError('formatting string cannot be longer than 64k') if any(len(a) > 65535 for a in args): raise ValueError('argument string cannot be longer than 64k') if any(len(l) > 255 for l in labels): raise ValueError('label string cannot be longer than 255 bytes') chunks = [ struct.pack(r' dict of commands that are actively being received. self._receivingcommands = {} # Request IDs that have been received and are actively being processed. # Once all output for a request has been sent, it is removed from this # set. self._activecommands = set() def onframerecv(self, frame): """Process a frame that has been received off the wire. Returns a dict with an ``action`` key that details what action, if any, the consumer should take next. """ handlers = { 'idle': self._onframeidle, 'command-receiving': self._onframecommandreceiving, 'errored': self._onframeerrored, } meth = handlers.get(self._state) if not meth: raise error.ProgrammingError('unhandled state: %s' % self._state) return meth(frame) def onbytesresponseready(self, requestid, data): """Signal that a bytes response is ready to be sent to the client. The raw bytes response is passed as an argument. """ def sendframes(): for frame in createbytesresponseframesfrombytes(requestid, data): yield frame self._activecommands.remove(requestid) result = sendframes() if self._deferoutput: self._bufferedframegens.append(result) return 'noop', {} else: return 'sendframes', { 'framegen': result, } def oninputeof(self): """Signals that end of input has been received. No more frames will be received. All pending activity should be completed. """ # TODO should we do anything about in-flight commands? if not self._deferoutput or not self._bufferedframegens: return 'noop', {} # If we buffered all our responses, emit those. def makegen(): for gen in self._bufferedframegens: for frame in gen: yield frame return 'sendframes', { 'framegen': makegen(), } def onapplicationerror(self, requestid, msg): return 'sendframes', { 'framegen': createerrorframe(requestid, msg, application=True), } def _makeerrorresult(self, msg): return 'error', { 'message': msg, } def _makeruncommandresult(self, requestid): entry = self._receivingcommands[requestid] del self._receivingcommands[requestid] if self._receivingcommands: self._state = 'command-receiving' else: self._state = 'idle' assert requestid not in self._activecommands self._activecommands.add(requestid) return 'runcommand', { 'requestid': requestid, 'command': entry['command'], 'args': entry['args'], 'data': entry['data'].getvalue() if entry['data'] else None, } def _makewantframeresult(self): return 'wantframe', { 'state': self._state, } def _onframeidle(self, frame): # The only frame type that should be received in this state is a # command request. if frame.typeid != FRAME_TYPE_COMMAND_NAME: self._state = 'errored' return self._makeerrorresult( _('expected command frame; got %d') % frame.typeid) if frame.requestid in self._receivingcommands: self._state = 'errored' return self._makeerrorresult( _('request with ID %d already received') % frame.requestid) if frame.requestid in self._activecommands: self._state = 'errored' return self._makeerrorresult(( _('request with ID %d is already active') % frame.requestid)) expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS) expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA) self._receivingcommands[frame.requestid] = { 'command': frame.payload, 'args': {}, 'data': None, 'expectingargs': expectingargs, 'expectingdata': expectingdata, } if frame.flags & FLAG_COMMAND_NAME_EOS: return self._makeruncommandresult(frame.requestid) if expectingargs or expectingdata: self._state = 'command-receiving' return self._makewantframeresult() else: self._state = 'errored' return self._makeerrorresult(_('missing frame flags on ' 'command frame')) def _onframecommandreceiving(self, frame): # It could be a new command request. Process it as such. if frame.typeid == FRAME_TYPE_COMMAND_NAME: return self._onframeidle(frame) # All other frames should be related to a command that is currently # receiving but is not active. if frame.requestid in self._activecommands: self._state = 'errored' return self._makeerrorresult( _('received frame for request that is still active: %d') % frame.requestid) if frame.requestid not in self._receivingcommands: self._state = 'errored' return self._makeerrorresult( _('received frame for request that is not receiving: %d') % frame.requestid) entry = self._receivingcommands[frame.requestid] if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT: if not entry['expectingargs']: self._state = 'errored' return self._makeerrorresult(_( 'received command argument frame for request that is not ' 'expecting arguments: %d') % frame.requestid) return self._handlecommandargsframe(frame, entry) elif frame.typeid == FRAME_TYPE_COMMAND_DATA: if not entry['expectingdata']: self._state = 'errored' return self._makeerrorresult(_( 'received command data frame for request that is not ' 'expecting data: %d') % frame.requestid) if entry['data'] is None: entry['data'] = util.bytesio() return self._handlecommanddataframe(frame, entry) def _handlecommandargsframe(self, frame, entry): # The frame and state of command should have already been validated. assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT offset = 0 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload) offset += ARGUMENT_FRAME_HEADER.size # The argument name MUST fit inside the frame. argname = bytes(frame.payload[offset:offset + namesize]) offset += namesize if len(argname) != namesize: self._state = 'errored' return self._makeerrorresult(_('malformed argument frame: ' 'partial argument name')) argvalue = bytes(frame.payload[offset:]) # Argument value spans multiple frames. Record our active state # and wait for the next frame. if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION: raise error.ProgrammingError('not yet implemented') # Common case: the argument value is completely contained in this # frame. if len(argvalue) != valuesize: self._state = 'errored' return self._makeerrorresult(_('malformed argument frame: ' 'partial argument value')) entry['args'][argname] = argvalue if frame.flags & FLAG_COMMAND_ARGUMENT_EOA: if entry['expectingdata']: # TODO signal request to run a command once we don't # buffer data frames. return self._makewantframeresult() else: return self._makeruncommandresult(frame.requestid) else: return self._makewantframeresult() def _handlecommanddataframe(self, frame, entry): assert frame.typeid == FRAME_TYPE_COMMAND_DATA # TODO support streaming data instead of buffering it. entry['data'].write(frame.payload) if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: return self._makewantframeresult() elif frame.flags & FLAG_COMMAND_DATA_EOS: entry['data'].seek(0) return self._makeruncommandresult(frame.requestid) else: self._state = 'errored' return self._makeerrorresult(_('command data frame without ' 'flags')) def _onframeerrored(self, frame): return self._makeerrorresult(_('server already errored'))