# HG changeset patch # User Gregory Szorc # Date 2018-10-04 23:26:45 # Node ID 327d40b94bedd65d715c5268caa4337cd106dd87 # Parent e2fe1074024cb0d15c40f73e31618e304100d0b4 wireprotov2: handle sender protocol settings frames We teach the server reactor to handle the optional sender protocol settings frames, which can only be sent at the beginning of frame exchange. Right now, we simply decode the data and record the sender protocol settings on the server reactor instance: we don't yet do anything meaningful with the data. Differential Revision: https://phab.mercurial-scm.org/D4916 diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -674,6 +674,10 @@ def ensureserverstream(stream): 'numbered streams; %d is not even' % stream.streamid) +DEFAULT_PROTOCOL_SETTINGS = { + 'contentencodings': [b'identity'], +} + class serverreactor(object): """Holds state of a server handling frame-based protocol requests. @@ -750,7 +754,7 @@ class serverreactor(object): sender cannot receive until all data has been transmitted. """ self._deferoutput = deferoutput - self._state = 'idle' + self._state = 'initial' self._nextoutgoingstreamid = 2 self._bufferedframegens = [] # stream id -> stream instance for all active streams from the client. @@ -763,6 +767,11 @@ class serverreactor(object): # set. self._activecommands = set() + self._protocolsettingsdecoder = None + + # Sender protocol settings are optional. Set implied default values. + self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) + def onframerecv(self, frame): """Process a frame that has been received off the wire. @@ -794,6 +803,8 @@ class serverreactor(object): del self._incomingstreams[frame.streamid] handlers = { + 'initial': self._onframeinitial, + 'protocol-settings-receiving': self._onframeprotocolsettings, 'idle': self._onframeidle, 'command-receiving': self._onframecommandreceiving, 'errored': self._onframeerrored, @@ -1062,6 +1073,85 @@ class serverreactor(object): _('received command request frame with neither new nor ' 'continuation flags set')) + def _onframeinitial(self, frame): + # Called when we receive a frame when in the "initial" state. + if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: + self._state = 'protocol-settings-receiving' + self._protocolsettingsdecoder = cborutil.bufferingdecoder() + return self._onframeprotocolsettings(frame) + + elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST: + self._state = 'idle' + return self._onframeidle(frame) + + else: + self._state = 'errored' + return self._makeerrorresult( + _('expected sender protocol settings or command request ' + 'frame; got %d') % frame.typeid) + + def _onframeprotocolsettings(self, frame): + assert self._state == 'protocol-settings-receiving' + assert self._protocolsettingsdecoder is not None + + if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: + self._state = 'errored' + return self._makeerrorresult( + _('expected sender protocol settings frame; got %d') % + frame.typeid) + + more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION + eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS + + if more and eos: + self._state = 'errored' + return self._makeerrorresult( + _('sender protocol settings frame cannot have both ' + 'continuation and end of stream flags set')) + + if not more and not eos: + self._state = 'errored' + return self._makeerrorresult( + _('sender protocol settings frame must have continuation or ' + 'end of stream flag set')) + + # TODO establish limits for maximum amount of data that can be + # buffered. + try: + self._protocolsettingsdecoder.decode(frame.payload) + except Exception as e: + self._state = 'errored' + return self._makeerrorresult( + _('error decoding CBOR from sender protocol settings frame: %s') + % stringutil.forcebytestr(e)) + + if more: + return self._makewantframeresult() + + assert eos + + decoded = self._protocolsettingsdecoder.getavailable() + self._protocolsettingsdecoder = None + + if not decoded: + self._state = 'errored' + return self._makeerrorresult( + _('sender protocol settings frame did not contain CBOR data')) + elif len(decoded) > 1: + self._state = 'errored' + return self._makeerrorresult( + _('sender protocol settings frame contained multiple CBOR ' + 'values')) + + d = decoded[0] + + if b'contentencodings' in d: + self._sendersettings['contentencodings'] = d[b'contentencodings'] + + self._state = 'idle' + + return self._makewantframeresult() + def _onframeidle(self, frame): # The only frame type that should be received in this state is a # command request. diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py --- a/tests/test-wireproto-serverreactor.py +++ b/tests/test-wireproto-serverreactor.py @@ -9,6 +9,9 @@ from mercurial import ( util, wireprotoframing as framing, ) +from mercurial.utils import ( + cborutil, +) ffs = framing.makeframefromhumanstring @@ -193,7 +196,8 @@ class ServerReactorTests(unittest.TestCa ffs(b'1 1 stream-begin command-data 0 ignored')) self.assertaction(result, b'error') self.assertEqual(result[1], { - b'message': b'expected command request frame; got 2', + b'message': b'expected sender protocol settings or command request ' + b'frame; got 2', }) def testunexpectedcommanddatareceiving(self): @@ -494,6 +498,105 @@ class ServerReactorTests(unittest.TestCa results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) self.assertaction(results[0], b'runcommand') + def testprotocolsettingsnoflags(self): + result = self._sendsingleframe( + makereactor(), + ffs(b'0 1 stream-begin sender-protocol-settings 0 ')) + self.assertaction(result, b'error') + self.assertEqual(result[1], { + b'message': b'sender protocol settings frame must have ' + b'continuation or end of stream flag set', + }) + + def testprotocolsettingsconflictflags(self): + result = self._sendsingleframe( + makereactor(), + ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos ')) + self.assertaction(result, b'error') + self.assertEqual(result[1], { + b'message': b'sender protocol settings frame cannot have both ' + b'continuation and end of stream flags set', + }) + + def testprotocolsettingsemptypayload(self): + result = self._sendsingleframe( + makereactor(), + ffs(b'0 1 stream-begin sender-protocol-settings eos ')) + self.assertaction(result, b'error') + self.assertEqual(result[1], { + b'message': b'sender protocol settings frame did not contain CBOR ' + b'data', + }) + + def testprotocolsettingsmultipleobjects(self): + result = self._sendsingleframe( + makereactor(), + ffs(b'0 1 stream-begin sender-protocol-settings eos ' + b'\x46foobar\x43foo')) + self.assertaction(result, b'error') + self.assertEqual(result[1], { + b'message': b'sender protocol settings frame contained multiple ' + b'CBOR values', + }) + + def testprotocolsettingscontentencodings(self): + reactor = makereactor() + + result = self._sendsingleframe( + reactor, + ffs(b'0 1 stream-begin sender-protocol-settings eos ' + b'cbor:{b"contentencodings": [b"a", b"b"]}')) + self.assertaction(result, b'wantframe') + + self.assertEqual(reactor._state, b'idle') + self.assertEqual(reactor._sendersettings[b'contentencodings'], + [b'a', b'b']) + + def testprotocolsettingsmultipleframes(self): + reactor = makereactor() + + data = b''.join(cborutil.streamencode({ + b'contentencodings': [b'value1', b'value2'], + })) + + results = list(sendframes(reactor, [ + ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' % + data[0:5]), + ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]), + ])) + + self.assertEqual(len(results), 2) + + self.assertaction(results[0], b'wantframe') + self.assertaction(results[1], b'wantframe') + + self.assertEqual(reactor._state, b'idle') + self.assertEqual(reactor._sendersettings[b'contentencodings'], + [b'value1', b'value2']) + + def testprotocolsettingsbadcbor(self): + result = self._sendsingleframe( + makereactor(), + ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue')) + self.assertaction(result, b'error') + + def testprotocolsettingsnoninitial(self): + # Cannot have protocol settings frames as non-initial frames. + reactor = makereactor() + + stream = framing.stream(1) + results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {})) + self.assertEqual(len(results), 1) + self.assertaction(results[0], b'runcommand') + + result = self._sendsingleframe( + reactor, + ffs(b'0 1 0 sender-protocol-settings eos ')) + self.assertaction(result, b'error') + self.assertEqual(result[1], { + b'message': b'expected command request frame; got 8', + }) + if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)