diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -14,6 +14,9 @@ from __future__ import absolute_import import struct from .i18n import _ +from .thirdparty import ( + attr, +) from . import ( error, util, @@ -92,6 +95,24 @@ FRAME_TYPE_FLAGS = { ARGUMENT_FRAME_HEADER = struct.Struct(r'> 4 frameflags = typeflags & 0x0f - return requestid, frametype, frameflags, framelength + return frameheader(framelength, requestid, frametype, frameflags) def readframe(fh): """Read a unified framing protocol frame from a file object. @@ -184,14 +205,14 @@ def readframe(fh): raise error.Abort(_('received incomplete frame: got %d bytes: %s') % (readcount, header)) - requestid, frametype, frameflags, framelength = parseheader(header) + h = parseheader(header) - payload = fh.read(framelength) - if len(payload) != framelength: + payload = fh.read(h.length) + if len(payload) != h.length: raise error.Abort(_('frame length error: expected %d; got %d') % - (framelength, len(payload))) + (h.length, len(payload))) - return requestid, frametype, frameflags, payload + return frame(h.requestid, h.typeid, h.flags, payload) def createcommandframes(requestid, cmd, args, datafh=None): """Create frames necessary to transmit a request to run a command. @@ -433,7 +454,7 @@ class serverreactor(object): # request id -> dict of commands that are actively being received. self._receivingcommands = {} - def onframerecv(self, requestid, frametype, frameflags, payload): + def onframerecv(self, frame): """Process a frame that has been received off the wire. Returns a dict with an ``action`` key that details what action, @@ -449,7 +470,7 @@ class serverreactor(object): if not meth: raise error.ProgrammingError('unhandled state: %s' % self._state) - return meth(requestid, frametype, frameflags, payload) + return meth(frame) def onbytesresponseready(self, requestid, data): """Signal that a bytes response is ready to be sent to the client. @@ -518,32 +539,32 @@ class serverreactor(object): 'state': self._state, } - def _onframeidle(self, requestid, frametype, frameflags, payload): + def _onframeidle(self, frame): # The only frame type that should be received in this state is a # command request. - if frametype != FRAME_TYPE_COMMAND_NAME: + if frame.typeid != FRAME_TYPE_COMMAND_NAME: self._state = 'errored' return self._makeerrorresult( - _('expected command frame; got %d') % frametype) + _('expected command frame; got %d') % frame.typeid) - if requestid in self._receivingcommands: + if frame.requestid in self._receivingcommands: self._state = 'errored' return self._makeerrorresult( - _('request with ID %d already received') % requestid) + _('request with ID %d already received') % frame.requestid) - expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) - expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) + expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS) + expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA) - self._receivingcommands[requestid] = { - 'command': payload, + self._receivingcommands[frame.requestid] = { + 'command': frame.payload, 'args': {}, 'data': None, 'expectingargs': expectingargs, 'expectingdata': expectingdata, } - if frameflags & FLAG_COMMAND_NAME_EOS: - return self._makeruncommandresult(requestid) + if frame.flags & FLAG_COMMAND_NAME_EOS: + return self._makeruncommandresult(frame.requestid) if expectingargs or expectingdata: self._state = 'command-receiving' @@ -553,56 +574,52 @@ class serverreactor(object): return self._makeerrorresult(_('missing frame flags on ' 'command frame')) - def _onframecommandreceiving(self, requestid, frametype, frameflags, - payload): + def _onframecommandreceiving(self, frame): # It could be a new command request. Process it as such. - if frametype == FRAME_TYPE_COMMAND_NAME: - return self._onframeidle(requestid, frametype, frameflags, payload) + if frame.typeid == FRAME_TYPE_COMMAND_NAME: + return self._onframeidle(frame) # All other frames should be related to a command that is currently # receiving. - if requestid not in self._receivingcommands: + if frame.requestid not in self._receivingcommands: self._state = 'errored' return self._makeerrorresult( _('received frame for request that is not receiving: %d') % - requestid) + frame.requestid) - entry = self._receivingcommands[requestid] + entry = self._receivingcommands[frame.requestid] - if frametype == FRAME_TYPE_COMMAND_ARGUMENT: + if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT: if not entry['expectingargs']: self._state = 'errored' return self._makeerrorresult(_( 'received command argument frame for request that is not ' - 'expecting arguments: %d') % requestid) + 'expecting arguments: %d') % frame.requestid) - return self._handlecommandargsframe(requestid, entry, frametype, - frameflags, payload) + return self._handlecommandargsframe(frame, entry) - elif frametype == FRAME_TYPE_COMMAND_DATA: + elif frame.typeid == FRAME_TYPE_COMMAND_DATA: if not entry['expectingdata']: self._state = 'errored' return self._makeerrorresult(_( 'received command data frame for request that is not ' - 'expecting data: %d') % requestid) + 'expecting data: %d') % frame.requestid) if entry['data'] is None: entry['data'] = util.bytesio() - return self._handlecommanddataframe(requestid, entry, frametype, - frameflags, payload) + return self._handlecommanddataframe(frame, entry) - def _handlecommandargsframe(self, requestid, entry, frametype, frameflags, - payload): + def _handlecommandargsframe(self, frame, entry): # The frame and state of command should have already been validated. - assert frametype == FRAME_TYPE_COMMAND_ARGUMENT + assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT offset = 0 - namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) + namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload) offset += ARGUMENT_FRAME_HEADER.size # The argument name MUST fit inside the frame. - argname = bytes(payload[offset:offset + namesize]) + argname = bytes(frame.payload[offset:offset + namesize]) offset += namesize if len(argname) != namesize: @@ -610,11 +627,11 @@ class serverreactor(object): return self._makeerrorresult(_('malformed argument frame: ' 'partial argument name')) - argvalue = bytes(payload[offset:]) + argvalue = bytes(frame.payload[offset:]) # Argument value spans multiple frames. Record our active state # and wait for the next frame. - if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: + if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION: raise error.ProgrammingError('not yet implemented') # Common case: the argument value is completely contained in this @@ -627,32 +644,31 @@ class serverreactor(object): entry['args'][argname] = argvalue - if frameflags & FLAG_COMMAND_ARGUMENT_EOA: + if frame.flags & FLAG_COMMAND_ARGUMENT_EOA: if entry['expectingdata']: # TODO signal request to run a command once we don't # buffer data frames. return self._makewantframeresult() else: - return self._makeruncommandresult(requestid) + return self._makeruncommandresult(frame.requestid) else: return self._makewantframeresult() - def _handlecommanddataframe(self, requestid, entry, frametype, frameflags, - payload): - assert frametype == FRAME_TYPE_COMMAND_DATA + def _handlecommanddataframe(self, frame, entry): + assert frame.typeid == FRAME_TYPE_COMMAND_DATA # TODO support streaming data instead of buffering it. - entry['data'].write(payload) + entry['data'].write(frame.payload) - if frameflags & FLAG_COMMAND_DATA_CONTINUATION: + if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: return self._makewantframeresult() - elif frameflags & FLAG_COMMAND_DATA_EOS: + elif frame.flags & FLAG_COMMAND_DATA_EOS: entry['data'].seek(0) - return self._makeruncommandresult(requestid) + return self._makeruncommandresult(frame.requestid) else: self._state = 'errored' return self._makeerrorresult(_('command data frame without ' 'flags')) - def _onframeerrored(self, requestid, frametype, frameflags, payload): + def _onframeerrored(self, frame): return self._makeerrorresult(_('server already errored')) diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py --- a/mercurial/wireprotoserver.py +++ b/mercurial/wireprotoserver.py @@ -400,12 +400,11 @@ def _processhttpv2reflectrequest(ui, rep states.append(b'received: ') break - requestid, frametype, frameflags, payload = frame - states.append(b'received: %d %d %d %s' % (frametype, frameflags, - requestid, payload)) + states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags, + frame.requestid, + frame.payload)) - action, meta = reactor.onframerecv(requestid, frametype, frameflags, - payload) + action, meta = reactor.onframerecv(frame) states.append(json.dumps((action, meta), sort_keys=True, separators=(', ', ': '))) @@ -434,7 +433,7 @@ def _processhttpv2request(ui, repo, req, if not frame: break - action, meta = reactor.onframerecv(*frame) + action, meta = reactor.onframerecv(frame) if action == 'wantframe': # Need more data before we can do anything. diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py --- a/tests/test-wireproto-serverreactor.py +++ b/tests/test-wireproto-serverreactor.py @@ -18,11 +18,14 @@ def sendframes(reactor, gen): Emits a generator of results from ``onframerecv()`` calls. """ for frame in gen: - rid, frametype, frameflags, framelength = framing.parseheader(frame) + header = framing.parseheader(frame) payload = frame[framing.FRAME_HEADER_SIZE:] - assert len(payload) == framelength + assert len(payload) == header.length - yield reactor.onframerecv(rid, frametype, frameflags, payload) + yield reactor.onframerecv(framing.frame(header.requestid, + header.typeid, + header.flags, + payload)) def sendcommandframes(reactor, rid, cmd, args, datafh=None): """Generate frames to run a command and send them to a reactor."""