test-wireproto-clientreactor.py
604 lines
| 20.8 KiB
| text/x-python
|
PythonLexer
/ tests / test-wireproto-clientreactor.py
Gregory Szorc
|
r37561 | from __future__ import absolute_import | ||
import unittest | ||||
Gregory Szorc
|
r40167 | import zlib | ||
Gregory Szorc
|
r37561 | |||
from mercurial import ( | ||||
error, | ||||
Gregory Szorc
|
r40165 | ui as uimod, | ||
Gregory Szorc
|
r37561 | wireprotoframing as framing, | ||
) | ||||
Gregory Szorc
|
r40164 | from mercurial.utils import ( | ||
cborutil, | ||||
) | ||||
Gregory Szorc
|
r37561 | |||
Gregory Szorc
|
r40167 | try: | ||
from mercurial import zstd | ||||
zstd.__version__ | ||||
except ImportError: | ||||
zstd = None | ||||
Gregory Szorc
|
r37562 | ffs = framing.makeframefromhumanstring | ||
Gregory Szorc
|
r40165 | globalui = uimod.ui() | ||
Gregory Szorc
|
r37562 | def sendframe(reactor, frame): | ||
"""Send a frame bytearray to a reactor.""" | ||||
header = framing.parseheader(frame) | ||||
payload = frame[framing.FRAME_HEADER_SIZE:] | ||||
assert len(payload) == header.length | ||||
return reactor.onframerecv(framing.frame(header.requestid, | ||||
header.streamid, | ||||
header.streamflags, | ||||
header.typeid, | ||||
header.flags, | ||||
payload)) | ||||
Gregory Szorc
|
r37561 | class SingleSendTests(unittest.TestCase): | ||
"""A reactor that can only send once rejects subsequent sends.""" | ||||
Augie Fackler
|
r37733 | |||
if not getattr(unittest.TestCase, 'assertRaisesRegex', False): | ||||
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks | ||||
# the regex version. | ||||
assertRaisesRegex = (# camelcase-required | ||||
unittest.TestCase.assertRaisesRegexp) | ||||
Gregory Szorc
|
r37561 | def testbasic(self): | ||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, | ||
hasmultiplesend=False, | ||||
buffersends=True) | ||||
Gregory Szorc
|
r37561 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'pending') | ||
self.assertEqual(action, b'noop') | ||||
Gregory Szorc
|
r37561 | |||
action, meta = reactor.flushcommands() | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
self.assertEqual(request.state, b'sending') | ||||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'sent') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37733 | with self.assertRaisesRegex(error.ProgrammingError, | ||
Gregory Szorc
|
r37561 | 'cannot issue new commands'): | ||
reactor.callcommand(b'foo', {}) | ||||
Augie Fackler
|
r37733 | with self.assertRaisesRegex(error.ProgrammingError, | ||
Gregory Szorc
|
r37561 | 'cannot issue new commands'): | ||
reactor.callcommand(b'foo', {}) | ||||
class NoBufferTests(unittest.TestCase): | ||||
"""A reactor without send buffering sends requests immediately.""" | ||||
def testbasic(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, | ||
hasmultiplesend=True, | ||||
buffersends=False) | ||||
Gregory Szorc
|
r37561 | |||
request, action, meta = reactor.callcommand(b'command1', {}) | ||||
self.assertEqual(request.requestid, 1) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'pending') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
self.assertEqual(request.state, b'sending') | ||||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'sent') | ||
Gregory Szorc
|
r37561 | |||
action, meta = reactor.flushcommands() | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'noop') | ||
Gregory Szorc
|
r37561 | |||
# And we can send another command. | ||||
request, action, meta = reactor.callcommand(b'command2', {}) | ||||
self.assertEqual(request.requestid, 3) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
self.assertEqual(request.state, b'sending') | ||||
Gregory Szorc
|
r37561 | |||
Augie Fackler
|
r37700 | self.assertEqual(request.state, b'sent') | ||
Gregory Szorc
|
r37561 | |||
Gregory Szorc
|
r37562 | class BadFrameRecvTests(unittest.TestCase): | ||
Augie Fackler
|
r37733 | if not getattr(unittest.TestCase, 'assertRaisesRegex', False): | ||
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks | ||||
# the regex version. | ||||
assertRaisesRegex = (# camelcase-required | ||||
unittest.TestCase.assertRaisesRegexp) | ||||
Gregory Szorc
|
r37562 | def testoddstream(self): | ||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui) | ||
Gregory Szorc
|
r37562 | |||
action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'error') | ||
self.assertEqual(meta[b'message'], | ||||
b'received frame with odd numbered stream ID: 1') | ||||
Gregory Szorc
|
r37562 | |||
def testunknownstream(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui) | ||
Gregory Szorc
|
r37562 | |||
action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'error') | ||
self.assertEqual(meta[b'message'], | ||||
b'received frame on unknown stream without beginning ' | ||||
b'of stream flag set') | ||||
Gregory Szorc
|
r37562 | |||
def testunhandledframetype(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r37562 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
Augie Fackler
|
r37700 | for frame in meta[b'framegen']: | ||
Gregory Szorc
|
r37562 | pass | ||
Augie Fackler
|
r37733 | with self.assertRaisesRegex(error.ProgrammingError, | ||
Gregory Szorc
|
r37562 | 'unhandled frame type'): | ||
sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) | ||||
Gregory Szorc
|
r37674 | class StreamTests(unittest.TestCase): | ||
def testmultipleresponseframes(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r37674 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'sendframes') | ||
for f in meta[b'framegen']: | ||||
Gregory Szorc
|
r37674 | pass | ||
action, meta = sendframe( | ||||
reactor, | ||||
Gregory Szorc
|
r37742 | ffs(b'%d 0 stream-begin command-response 0 foo' % | ||
request.requestid)) | ||||
Augie Fackler
|
r37700 | self.assertEqual(action, b'responsedata') | ||
Gregory Szorc
|
r37674 | |||
action, meta = sendframe( | ||||
reactor, | ||||
Gregory Szorc
|
r37742 | ffs(b'%d 0 0 command-response eos bar' % request.requestid)) | ||
Augie Fackler
|
r37700 | self.assertEqual(action, b'responsedata') | ||
Gregory Szorc
|
r37674 | |||
Gregory Szorc
|
r40060 | class RedirectTests(unittest.TestCase): | ||
def testredirect(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40060 | |||
redirect = { | ||||
b'targets': [b'a', b'b'], | ||||
b'hashes': [b'sha256'], | ||||
} | ||||
request, action, meta = reactor.callcommand( | ||||
b'foo', {}, redirect=redirect) | ||||
self.assertEqual(action, b'sendframes') | ||||
frames = list(meta[b'framegen']) | ||||
self.assertEqual(len(frames), 1) | ||||
self.assertEqual(frames[0], | ||||
ffs(b'1 1 stream-begin command-request new ' | ||||
b"cbor:{b'name': b'foo', " | ||||
b"b'redirect': {b'targets': [b'a', b'b'], " | ||||
b"b'hashes': [b'sha256']}}")) | ||||
Gregory Szorc
|
r40164 | class StreamSettingsTests(unittest.TestCase): | ||
def testnoflags(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings 0 ')) | ||||
self.assertEqual(action, b'error') | ||||
self.assertEqual(meta, { | ||||
b'message': b'stream encoding settings frame must have ' | ||||
b'continuation or end of stream flag set', | ||||
}) | ||||
def testconflictflags(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings continuation|eos ')) | ||||
self.assertEqual(action, b'error') | ||||
self.assertEqual(meta, { | ||||
b'message': b'stream encoding settings frame cannot have both ' | ||||
b'continuation and end of stream flags set', | ||||
}) | ||||
def testemptypayload(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings eos ')) | ||||
self.assertEqual(action, b'error') | ||||
self.assertEqual(meta, { | ||||
b'message': b'stream encoding settings frame did not contain ' | ||||
b'CBOR data' | ||||
}) | ||||
def testbadcbor(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings eos badvalue')) | ||||
self.assertEqual(action, b'error') | ||||
def testsingleobject(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"')) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
def testmultipleobjects(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
data = b''.join([ | ||||
b''.join(cborutil.streamencode(b'identity')), | ||||
b''.join(cborutil.streamencode({b'foo', b'bar'})), | ||||
]) | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings eos %s' % data)) | ||||
Gregory Szorc
|
r40167 | self.assertEqual(action, b'error') | ||
self.assertEqual(meta, { | ||||
b'message': b'error setting stream decoder: identity decoder ' | ||||
b'received unexpected additional values', | ||||
}) | ||||
Gregory Szorc
|
r40164 | |||
def testmultipleframes(self): | ||||
Gregory Szorc
|
r40165 | reactor = framing.clientreactor(globalui, buffersends=False) | ||
Gregory Szorc
|
r40164 | |||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
data = b''.join(cborutil.streamencode(b'identity')) | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings continuation %s' % | ||||
data[0:3])) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 0 stream-settings eos %s' % data[3:])) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
Gregory Szorc
|
r40167 | def testinvalidencoder(self): | ||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"')) | ||||
self.assertEqual(action, b'error') | ||||
self.assertEqual(meta, { | ||||
b'message': b'error setting stream decoder: unknown stream ' | ||||
b'decoder: badvalue', | ||||
}) | ||||
def testzlibencoding(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | ||||
request.requestid)) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
compressed = zlib.compress(encoded) | ||||
self.assertEqual(zlib.decompress(compressed), encoded) | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response eos %s' % | ||||
(request.requestid, compressed))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], encoded) | ||||
def testzlibencodingsinglebyteframes(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | ||||
request.requestid)) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
compressed = zlib.compress(encoded) | ||||
self.assertEqual(zlib.decompress(compressed), encoded) | ||||
chunks = [] | ||||
for i in range(len(compressed)): | ||||
char = compressed[i:i + 1] | ||||
if char == b'\\': | ||||
char = b'\\\\' | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response continuation %s' % | ||||
(request.requestid, char))) | ||||
self.assertEqual(action, b'responsedata') | ||||
chunks.append(meta[b'data']) | ||||
self.assertTrue(meta[b'expectmore']) | ||||
self.assertFalse(meta[b'eos']) | ||||
# zlib will have the full data decoded at this point, even though | ||||
# we haven't flushed. | ||||
self.assertEqual(b''.join(chunks), encoded) | ||||
# End the stream for good measure. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
self.assertFalse(meta[b'expectmore']) | ||||
self.assertTrue(meta[b'eos']) | ||||
def testzlibmultipleresponses(self): | ||||
# We feed in zlib compressed data on the same stream but belonging to | ||||
# 2 different requests. This tests our flushing behavior. | ||||
reactor = framing.clientreactor(globalui, buffersends=False, | ||||
hasmultiplesend=True) | ||||
request1, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
request2, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
outstream = framing.outputstream(2) | ||||
outstream.setencoder(globalui, b'zlib') | ||||
response1 = b''.join(cborutil.streamencode({ | ||||
b'status': b'ok', | ||||
b'extra': b'response1' * 10, | ||||
})) | ||||
response2 = b''.join(cborutil.streamencode({ | ||||
b'status': b'error', | ||||
b'extra': b'response2' * 10, | ||||
})) | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | ||||
request1.requestid)) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
# Feeding partial data in won't get anything useful out. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response continuation %s' % ( | ||||
request1.requestid, outstream.encode(response1)))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
# But flushing data at both ends will get our original data. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response eos %s' % ( | ||||
request1.requestid, outstream.flush()))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], response1) | ||||
# We should be able to reuse the compressor/decompressor for the | ||||
# 2nd response. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response continuation %s' % ( | ||||
request2.requestid, outstream.encode(response2)))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response eos %s' % ( | ||||
request2.requestid, outstream.flush()))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], response2) | ||||
@unittest.skipUnless(zstd, 'zstd not available') | ||||
def testzstd8mbencoding(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | ||||
request.requestid)) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
encoder = framing.zstd8mbencoder(globalui) | ||||
compressed = encoder.encode(encoded) + encoder.finish() | ||||
self.assertEqual(zstd.ZstdDecompressor().decompress( | ||||
compressed, max_output_size=len(encoded)), encoded) | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response eos %s' % | ||||
(request.requestid, compressed))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], encoded) | ||||
@unittest.skipUnless(zstd, 'zstd not available') | ||||
def testzstd8mbencodingsinglebyteframes(self): | ||||
reactor = framing.clientreactor(globalui, buffersends=False) | ||||
request, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | ||||
request.requestid)) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
result = { | ||||
b'status': b'ok', | ||||
} | ||||
encoded = b''.join(cborutil.streamencode(result)) | ||||
compressed = zstd.ZstdCompressor().compress(encoded) | ||||
self.assertEqual(zstd.ZstdDecompressor().decompress(compressed), | ||||
encoded) | ||||
chunks = [] | ||||
for i in range(len(compressed)): | ||||
char = compressed[i:i + 1] | ||||
if char == b'\\': | ||||
char = b'\\\\' | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response continuation %s' % | ||||
(request.requestid, char))) | ||||
self.assertEqual(action, b'responsedata') | ||||
chunks.append(meta[b'data']) | ||||
self.assertTrue(meta[b'expectmore']) | ||||
self.assertFalse(meta[b'eos']) | ||||
# zstd decompressor will flush at frame boundaries. | ||||
self.assertEqual(b''.join(chunks), encoded) | ||||
# End the stream for good measure. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
self.assertFalse(meta[b'expectmore']) | ||||
self.assertTrue(meta[b'eos']) | ||||
@unittest.skipUnless(zstd, 'zstd not available') | ||||
def testzstd8mbmultipleresponses(self): | ||||
# We feed in zstd compressed data on the same stream but belonging to | ||||
# 2 different requests. This tests our flushing behavior. | ||||
reactor = framing.clientreactor(globalui, buffersends=False, | ||||
hasmultiplesend=True) | ||||
request1, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
request2, action, meta = reactor.callcommand(b'foo', {}) | ||||
for f in meta[b'framegen']: | ||||
pass | ||||
outstream = framing.outputstream(2) | ||||
outstream.setencoder(globalui, b'zstd-8mb') | ||||
response1 = b''.join(cborutil.streamencode({ | ||||
b'status': b'ok', | ||||
b'extra': b'response1' * 10, | ||||
})) | ||||
response2 = b''.join(cborutil.streamencode({ | ||||
b'status': b'error', | ||||
b'extra': b'response2' * 10, | ||||
})) | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | ||||
request1.requestid)) | ||||
self.assertEqual(action, b'noop') | ||||
self.assertEqual(meta, {}) | ||||
# Feeding partial data in won't get anything useful out. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response continuation %s' % ( | ||||
request1.requestid, outstream.encode(response1)))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
# But flushing data at both ends will get our original data. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response eos %s' % ( | ||||
request1.requestid, outstream.flush()))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], response1) | ||||
# We should be able to reuse the compressor/decompressor for the | ||||
# 2nd response. | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response continuation %s' % ( | ||||
request2.requestid, outstream.encode(response2)))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], b'') | ||||
action, meta = sendframe(reactor, | ||||
ffs(b'%d 2 encoded command-response eos %s' % ( | ||||
request2.requestid, outstream.flush()))) | ||||
self.assertEqual(action, b'responsedata') | ||||
self.assertEqual(meta[b'data'], response2) | ||||
Gregory Szorc
|
r37561 | if __name__ == '__main__': | ||
import silenttestrunner | ||||
silenttestrunner.main(__name__) | ||||