test-wireproto-clientreactor.py
765 lines
| 22.4 KiB
| text/x-python
|
PythonLexer
/ tests / test-wireproto-clientreactor.py
Augie Fackler
|
r40516 | import sys | ||
Gregory Szorc
|
r37561 | import unittest | ||
Gregory Szorc
|
r40167 | import zlib | ||
Gregory Szorc
|
r37561 | |||
from mercurial import ( | ||||
error, | ||||
Gregory Szorc
|
r40165 | ui as uimod, | ||
Gregory Szorc
|
r37561 | wireprotoframing as framing, | ||
) | ||||
Augie Fackler
|
r43346 | from mercurial.utils import cborutil | ||
Gregory Szorc
|
r37561 | |||
Gregory Szorc
|
r40167 | try: | ||
from mercurial import zstd | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40167 | zstd.__version__ | ||
except ImportError: | ||||
zstd = None | ||||
Gregory Szorc
|
r37562 | ffs = framing.makeframefromhumanstring | ||
Gregory Szorc
|
r40165 | globalui = uimod.ui() | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37562 | def sendframe(reactor, frame): | ||
"""Send a frame bytearray to a reactor.""" | ||||
header = framing.parseheader(frame) | ||||
Augie Fackler
|
r43346 | payload = frame[framing.FRAME_HEADER_SIZE :] | ||
Gregory Szorc
|
r37562 | assert len(payload) == header.length | ||
Augie Fackler
|
r43346 | return reactor.onframerecv( | ||
framing.frame( | ||||
header.requestid, | ||||
header.streamid, | ||||
header.streamflags, | ||||
header.typeid, | ||||
header.flags, | ||||
payload, | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37562 | |||
Gregory Szorc
|
r37561 | class SingleSendTests(unittest.TestCase): | ||
"""A reactor that can only send once rejects subsequent sends.""" | ||||
Augie Fackler
|
r37733 | |||
if not getattr(unittest.TestCase, 'assertRaisesRegex', False): | ||||
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks | ||||
# the regex version. | ||||
Augie Fackler
|
r43346 | assertRaisesRegex = ( # camelcase-required | ||
unittest.TestCase.assertRaisesRegexp | ||||
) | ||||
Augie Fackler
|
r37733 | |||
Gregory Szorc
|
r37561 | def testbasic(self): | ||
Augie Fackler
|
r43346 | reactor = framing.clientreactor( | ||
globalui, hasmultiplesend=False, buffersends=True | ||||
) | ||||
Gregory Szorc
|
r37561 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'pending') | ||
self.assertEqual(action, b'noop') | ||||
Gregory Szorc
|
r37561 | |||
action, meta = reactor.flushcommands() | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
self.assertEqual(request.state, b'sending') | ||||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'sent') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r43346 | with self.assertRaisesRegex( | ||
error.ProgrammingError, 'cannot issue new commands' | ||||
): | ||||
Gregory Szorc
|
r37561 | reactor.callcommand(b'foo', {}) | ||
Augie Fackler
|
r43346 | with self.assertRaisesRegex( | ||
error.ProgrammingError, 'cannot issue new commands' | ||||
): | ||||
Gregory Szorc
|
r37561 | reactor.callcommand(b'foo', {}) | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37561 | class NoBufferTests(unittest.TestCase): | ||
"""A reactor without send buffering sends requests immediately.""" | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37561 | def testbasic(self): | ||
Augie Fackler
|
r43346 | reactor = framing.clientreactor( | ||
globalui, hasmultiplesend=True, buffersends=False | ||||
) | ||||
Gregory Szorc
|
r37561 | |||
request, action, meta = reactor.callcommand(b'command1', {}) | ||||
self.assertEqual(request.requestid, 1) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'pending') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
self.assertEqual(request.state, b'sending') | ||||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'sent') | ||
Gregory Szorc
|
r37561 | |||
action, meta = reactor.flushcommands() | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'noop') | ||
Gregory Szorc
|
r37561 | |||
# And we can send another command. | ||||
request, action, meta = reactor.callcommand(b'command2', {}) | ||||
self.assertEqual(request.requestid, 3) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
self.assertEqual(request.state, b'sending') | ||||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'sent') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37562 | class BadFrameRecvTests(unittest.TestCase): | ||
Augie Fackler
|
r37733 | if not getattr(unittest.TestCase, 'assertRaisesRegex', False): | ||
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks | ||||
# the regex version. | ||||
Augie Fackler
|
r43346 | assertRaisesRegex = ( # camelcase-required | ||
unittest.TestCase.assertRaisesRegexp | ||||
) | ||||
Augie Fackler
|
r37733 | |||
Gregory Szorc
|
r37562 | def testoddstream(self): | ||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui) | ||
Gregory Szorc
|
r37562 | |||
action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'error') | ||
Augie Fackler
|
r43346 | self.assertEqual( | ||
meta[b'message'], b'received frame with odd numbered stream ID: 1' | ||||
) | ||||
Gregory Szorc
|
r37562 | |||
def testunknownstream(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui) | ||
Gregory Szorc
|
r37562 | |||
action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'error') | ||
Augie Fackler
|
r43346 | self.assertEqual( | ||
meta[b'message'], | ||||
b'received frame on unknown stream without beginning ' | ||||
b'of stream flag set', | ||||
) | ||||
Gregory Szorc
|
r37562 | |||
def testunhandledframetype(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r37562 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
Gregory Szorc
|
r37562 | pass | ||
Augie Fackler
|
r43346 | with self.assertRaisesRegex( | ||
error.ProgrammingError, 'unhandled frame type' | ||||
): | ||||
Gregory Szorc
|
r37562 | sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37674 | class StreamTests(unittest.TestCase): | ||
def testmultipleresponseframes(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r37674 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
for f in meta[b'framegen']: | ||||
Gregory Szorc
|
r37674 | pass | ||
action, meta = sendframe( | ||||
reactor, | ||||
Augie Fackler
|
r43346 | ffs( | ||
b'%d 0 stream-begin command-response 0 foo' % request.requestid | ||||
), | ||||
) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'responsedata') | ||
Gregory Szorc
|
r37674 | |||
action, meta = sendframe( | ||||
Augie Fackler
|
r43346 | reactor, ffs(b'%d 0 0 command-response eos bar' % request.requestid) | ||
) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'responsedata') | ||
Gregory Szorc
|
r37674 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40060 | class RedirectTests(unittest.TestCase): | ||
def testredirect(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40060 | |||
redirect = { | ||||
b'targets': [b'a', b'b'], | ||||
b'hashes': [b'sha256'], | ||||
} | ||||
request, action, meta = reactor.callcommand( | ||||
Augie Fackler
|
r43346 | b'foo', {}, redirect=redirect | ||
) | ||||
Gregory Szorc
|
r40060 | |||
self.assertEqual(action, b'sendframes') | ||||
frames = list(meta[b'framegen']) | ||||
self.assertEqual(len(frames), 1) | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
frames[0], | ||||
ffs( | ||||
b'1 1 stream-begin command-request new ' | ||||
b"cbor:{b'name': b'foo', " | ||||
b"b'redirect': {b'targets': [b'a', b'b'], " | ||||
b"b'hashes': [b'sha256']}}" | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40060 | |||
Gregory Szorc
|
r40164 | class StreamSettingsTests(unittest.TestCase): | ||
def testnoflags(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, ffs(b'1 2 stream-begin stream-settings 0 ') | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
self.assertEqual(action, b'error') | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
meta, | ||||
{ | ||||
b'message': b'stream encoding settings frame must have ' | ||||
b'continuation or end of stream flag set', | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
def testconflictflags(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, ffs(b'1 2 stream-begin stream-settings continuation|eos ') | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
self.assertEqual(action, b'error') | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
meta, | ||||
{ | ||||
b'message': b'stream encoding settings frame cannot have both ' | ||||
b'continuation and end of stream flags set', | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
def testemptypayload(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, ffs(b'1 2 stream-begin stream-settings eos ') | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
self.assertEqual(action, b'error') | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
meta, | ||||
{ | ||||
b'message': b'stream encoding settings frame did not contain ' | ||||
b'CBOR data' | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
def testbadcbor(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, ffs(b'1 2 stream-begin stream-settings eos badvalue') | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
self.assertEqual(action, b'error') | ||||
def testsingleobject(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'), | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
def testmultipleobjects(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | data = b''.join( | ||
[ | ||||
b''.join(cborutil.streamencode(b'identity')), | ||||
b''.join(cborutil.streamencode({b'foo', b'bar'})), | ||||
] | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, ffs(b'1 2 stream-begin stream-settings eos %s' % data) | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'error') | ||
Augie Fackler
|
r43346 | self.assertEqual( | ||
meta, | ||||
{ | ||||
b'message': b'error setting stream decoder: identity decoder ' | ||||
b'received unexpected additional values', | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
def testmultipleframes(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
data = b''.join(cborutil.streamencode(b'identity')) | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'1 2 stream-begin stream-settings continuation %s' % data[0:3] | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, ffs(b'1 2 0 stream-settings eos %s' % data[3:]) | ||||
) | ||||
Gregory Szorc
|
r40164 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
Gregory Szorc
|
r40167 | def testinvalidencoder(self): | ||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'error') | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
meta, | ||||
{ | ||||
b'message': b'error setting stream decoder: unknown stream ' | ||||
b'decoder: badvalue', | ||||
}, | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
def testzlibencoding(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' | ||||
% request.requestid | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
compressed = zlib.compress(encoded) | ||||
self.assertEqual(zlib.decompress(compressed), encoded) | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response eos %s' | ||||
% (request.requestid, compressed) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], encoded) | ||||
def testzlibencodingsinglebyteframes(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' | ||||
% request.requestid | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
compressed = zlib.compress(encoded) | ||||
self.assertEqual(zlib.decompress(compressed), encoded) | ||||
chunks = [] | ||||
for i in range(len(compressed)): | ||||
Augie Fackler
|
r43346 | char = compressed[i : i + 1] | ||
Gregory Szorc
|
r40167 | if char == b'\\': | ||
char = b'\\\\' | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response continuation %s' | ||||
% (request.requestid, char) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'responsedata') | ||||
chunks.append(meta[b'data']) | ||||
self.assertTrue(meta[b'expectmore']) | ||||
self.assertFalse(meta[b'eos']) | ||||
# zlib will have the full data decoded at this point, even though | ||||
# we haven't flushed. | ||||
self.assertEqual(b''.join(chunks), encoded) | ||||
# End the stream for good measure. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs(b'%d 2 stream-end command-response eos ' % request.requestid), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
self.assertFalse(meta[b'expectmore']) | ||||
self.assertTrue(meta[b'eos']) | ||||
def testzlibmultipleresponses(self): | ||||
# We feed in zlib compressed data on the same stream but belonging to | ||||
# 2 different requests. This tests our flushing behavior. | ||||
Augie Fackler
|
r43346 | reactor = framing.clientreactor( | ||
globalui, buffersends=False, hasmultiplesend=True | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
request1, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
request2, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
outstream = framing.outputstream(2) | ||||
outstream.setencoder(globalui, b'zlib') | ||||
Augie Fackler
|
r43346 | response1 = b''.join( | ||
cborutil.streamencode( | ||||
Augie Fackler
|
r46554 | { | ||
b'status': b'ok', | ||||
b'extra': b'response1' * 10, | ||||
} | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40167 | |||
Augie Fackler
|
r43346 | response2 = b''.join( | ||
cborutil.streamencode( | ||||
Augie Fackler
|
r46554 | { | ||
b'status': b'error', | ||||
b'extra': b'response2' * 10, | ||||
} | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40167 | |||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' | ||||
% request1.requestid | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
# Feeding partial data in won't get anything useful out. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response continuation %s' | ||||
% (request1.requestid, outstream.encode(response1)) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], b'') | ||||
# But flushing data at both ends will get our original data. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response eos %s' | ||||
% (request1.requestid, outstream.flush()) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], response1) | ||||
# We should be able to reuse the compressor/decompressor for the | ||||
# 2nd response. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response continuation %s' | ||||
% (request2.requestid, outstream.encode(response2)) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], b'') | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response eos %s' | ||||
% (request2.requestid, outstream.flush()) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], response2) | ||||
@unittest.skipUnless(zstd, 'zstd not available') | ||||
def testzstd8mbencoding(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' | ||||
% request.requestid | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
encoder = framing.zstd8mbencoder(globalui) | ||||
compressed = encoder.encode(encoded) + encoder.finish() | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
zstd.ZstdDecompressor().decompress( | ||||
compressed, max_output_size=len(encoded) | ||||
), | ||||
encoded, | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response eos %s' | ||||
% (request.requestid, compressed) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], encoded) | ||||
@unittest.skipUnless(zstd, 'zstd not available') | ||||
def testzstd8mbencodingsinglebyteframes(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' | ||||
% request.requestid | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
compressed = zstd.ZstdCompressor().compress(encoded) | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
zstd.ZstdDecompressor().decompress(compressed), encoded | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
chunks = [] | ||||
for i in range(len(compressed)): | ||||
Augie Fackler
|
r43346 | char = compressed[i : i + 1] | ||
Gregory Szorc
|
r40167 | if char == b'\\': | ||
char = b'\\\\' | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response continuation %s' | ||||
% (request.requestid, char) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'responsedata') | ||||
chunks.append(meta[b'data']) | ||||
self.assertTrue(meta[b'expectmore']) | ||||
self.assertFalse(meta[b'eos']) | ||||
# zstd decompressor will flush at frame boundaries. | ||||
self.assertEqual(b''.join(chunks), encoded) | ||||
# End the stream for good measure. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs(b'%d 2 stream-end command-response eos ' % request.requestid), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
self.assertFalse(meta[b'expectmore']) | ||||
self.assertTrue(meta[b'eos']) | ||||
@unittest.skipUnless(zstd, 'zstd not available') | ||||
def testzstd8mbmultipleresponses(self): | ||||
# We feed in zstd compressed data on the same stream but belonging to | ||||
# 2 different requests. This tests our flushing behavior. | ||||
Augie Fackler
|
r43346 | reactor = framing.clientreactor( | ||
globalui, buffersends=False, hasmultiplesend=True | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
request1, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
request2, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
outstream = framing.outputstream(2) | ||||
outstream.setencoder(globalui, b'zstd-8mb') | ||||
Augie Fackler
|
r43346 | response1 = b''.join( | ||
cborutil.streamencode( | ||||
Augie Fackler
|
r46554 | { | ||
b'status': b'ok', | ||||
b'extra': b'response1' * 10, | ||||
} | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40167 | |||
Augie Fackler
|
r43346 | response2 = b''.join( | ||
cborutil.streamencode( | ||||
Augie Fackler
|
r46554 | { | ||
b'status': b'error', | ||||
b'extra': b'response2' * 10, | ||||
} | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r40167 | |||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' | ||||
% request1.requestid | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | |||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
# Feeding partial data in won't get anything useful out. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response continuation %s' | ||||
% (request1.requestid, outstream.encode(response1)) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], b'') | ||||
# But flushing data at both ends will get our original data. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response eos %s' | ||||
% (request1.requestid, outstream.flush()) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], response1) | ||||
# We should be able to reuse the compressor/decompressor for the | ||||
# 2nd response. | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response continuation %s' | ||||
% (request2.requestid, outstream.encode(response2)) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], b'') | ||||
Augie Fackler
|
r43346 | action, meta = sendframe( | ||
reactor, | ||||
ffs( | ||||
b'%d 2 encoded command-response eos %s' | ||||
% (request2.requestid, outstream.flush()) | ||||
), | ||||
) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'responsedata') | ||
self.assertEqual(meta[b'data'], response2) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37561 | if __name__ == '__main__': | ||
Augie Fackler
|
r41154 | if (3, 6, 0) <= sys.version_info < (3, 6, 4): | ||
Augie Fackler
|
r40516 | # Python 3.6.0 through 3.6.3 inclusive shipped with | ||
# https://bugs.python.org/issue31825 and we can't run these | ||||
# tests on those specific versions of Python. Sigh. | ||||
sys.exit(80) | ||||
Gregory Szorc
|
r37561 | import silenttestrunner | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37561 | silenttestrunner.main(__name__) | ||