from __future__ import absolute_import import unittest from mercurial import ( error, wireprotoframing as framing, ) ffs = framing.makeframefromhumanstring def sendframe(reactor, frame): """Send a frame bytearray to a reactor.""" header = framing.parseheader(frame) payload = frame[framing.FRAME_HEADER_SIZE:] assert len(payload) == header.length return reactor.onframerecv(framing.frame(header.requestid, header.streamid, header.streamflags, header.typeid, header.flags, payload)) class SingleSendTests(unittest.TestCase): """A reactor that can only send once rejects subsequent sends.""" def testbasic(self): reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True) request, action, meta = reactor.callcommand(b'foo', {}) self.assertEqual(request.state, 'pending') self.assertEqual(action, 'noop') action, meta = reactor.flushcommands() self.assertEqual(action, 'sendframes') for frame in meta['framegen']: self.assertEqual(request.state, 'sending') self.assertEqual(request.state, 'sent') with self.assertRaisesRegexp(error.ProgrammingError, 'cannot issue new commands'): reactor.callcommand(b'foo', {}) with self.assertRaisesRegexp(error.ProgrammingError, 'cannot issue new commands'): reactor.callcommand(b'foo', {}) class NoBufferTests(unittest.TestCase): """A reactor without send buffering sends requests immediately.""" def testbasic(self): reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False) request, action, meta = reactor.callcommand(b'command1', {}) self.assertEqual(request.requestid, 1) self.assertEqual(action, 'sendframes') self.assertEqual(request.state, 'pending') for frame in meta['framegen']: self.assertEqual(request.state, 'sending') self.assertEqual(request.state, 'sent') action, meta = reactor.flushcommands() self.assertEqual(action, 'noop') # And we can send another command. request, action, meta = reactor.callcommand(b'command2', {}) self.assertEqual(request.requestid, 3) self.assertEqual(action, 'sendframes') for frame in meta['framegen']: self.assertEqual(request.state, 'sending') self.assertEqual(request.state, 'sent') class BadFrameRecvTests(unittest.TestCase): def testoddstream(self): reactor = framing.clientreactor() action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) self.assertEqual(action, 'error') self.assertEqual(meta['message'], 'received frame with odd numbered stream ID: 1') def testunknownstream(self): reactor = framing.clientreactor() action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) self.assertEqual(action, 'error') self.assertEqual(meta['message'], 'received frame on unknown stream without beginning ' 'of stream flag set') def testunhandledframetype(self): reactor = framing.clientreactor(buffersends=False) request, action, meta = reactor.callcommand(b'foo', {}) for frame in meta['framegen']: pass with self.assertRaisesRegexp(error.ProgrammingError, 'unhandled frame type'): sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) class StreamTests(unittest.TestCase): def testmultipleresponseframes(self): reactor = framing.clientreactor(buffersends=False) request, action, meta = reactor.callcommand(b'foo', {}) self.assertEqual(action, 'sendframes') for f in meta['framegen']: pass action, meta = sendframe( reactor, ffs(b'%d 0 stream-begin 4 0 foo' % request.requestid)) self.assertEqual(action, 'responsedata') action, meta = sendframe( reactor, ffs(b'%d 0 0 4 eos bar' % request.requestid)) self.assertEqual(action, 'responsedata') if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)