##// END OF EJS Templates
wireprotov2: define and use stream encoders...
wireprotov2: define and use stream encoders Now that we have basic support for defining stream encoding, it is time to start doing something with it. We define various classes implementing stream encoders/decoders for the defined encoding profiles. This is relatively straightforward. We teach the inputstream and outputstream classes how to encode, decode, and flush data. We then teach the clientreactor how to filter received data through the inputstream decoder. One of the features of the framing format is that streams can span requests. This is a differentiating feature from say HTTP/2, which associates streams with requests. By allowing streams to span requests, we can reuse compression context data across requests/responses. But in order to do this, we need a mechanism to "flush" the encoder at logical boundaries so that receivers receive all data where it is expected. And a "flush" event is distinct from a "finish" event from the perspective of certain compressors because a "flush" will retain compression context state whereas a "finish" operation will not. This is why encoders have both a flush() and a finish() and each uses specific flushing semantics on the underlying compressor. The added tests verify various behavior of decoders via clientreactor. These tests do test some compression behavior via use of outputstream. But for all intents and purposes, server reactor support for encoding is not yet implemented. Differential Revision: https://phab.mercurial-scm.org/D4921

File last commit:

r40167:e6752241 default
r40167:e6752241 default
Show More
test-wireproto-clientreactor.py
604 lines | 20.8 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-clientreactor.py
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 from __future__ import absolute_import
import unittest
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 import zlib
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
from mercurial import (
error,
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 ui as uimod,
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 wireprotoframing as framing,
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 from mercurial.utils import (
cborutil,
)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 try:
from mercurial import zstd
zstd.__version__
except ImportError:
zstd = None
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 ffs = framing.makeframefromhumanstring
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 globalui = uimod.ui()
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 def sendframe(reactor, frame):
"""Send a frame bytearray to a reactor."""
header = framing.parseheader(frame)
payload = frame[framing.FRAME_HEADER_SIZE:]
assert len(payload) == header.length
return reactor.onframerecv(framing.frame(header.requestid,
header.streamid,
header.streamflags,
header.typeid,
header.flags,
payload))
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 class SingleSendTests(unittest.TestCase):
"""A reactor that can only send once rejects subsequent sends."""
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 def testbasic(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui,
hasmultiplesend=False,
buffersends=True)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
request, action, meta = reactor.callcommand(b'foo', {})
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'pending')
self.assertEqual(action, b'noop')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
action, meta = reactor.flushcommands()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'sent')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(error.ProgrammingError,
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 'cannot issue new commands'):
reactor.callcommand(b'foo', {})
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(error.ProgrammingError,
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 'cannot issue new commands'):
reactor.callcommand(b'foo', {})
class NoBufferTests(unittest.TestCase):
"""A reactor without send buffering sends requests immediately."""
def testbasic(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui,
hasmultiplesend=True,
buffersends=False)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
request, action, meta = reactor.callcommand(b'command1', {})
self.assertEqual(request.requestid, 1)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'pending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'sent')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
action, meta = reactor.flushcommands()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'noop')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
# And we can send another command.
request, action, meta = reactor.callcommand(b'command2', {})
self.assertEqual(request.requestid, 3)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'sent')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 class BadFrameRecvTests(unittest.TestCase):
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 def testoddstream(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'error')
self.assertEqual(meta[b'message'],
b'received frame with odd numbered stream ID: 1')
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
def testunknownstream(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'error')
self.assertEqual(meta[b'message'],
b'received frame on unknown stream without beginning '
b'of stream flag set')
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
def testunhandledframetype(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
request, action, meta = reactor.callcommand(b'foo', {})
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 pass
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(error.ProgrammingError,
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 'unhandled frame type'):
sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674 class StreamTests(unittest.TestCase):
def testmultipleresponseframes(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674
request, action, meta = reactor.callcommand(b'foo', {})
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
for f in meta[b'framegen']:
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674 pass
action, meta = sendframe(
reactor,
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 ffs(b'%d 0 stream-begin command-response 0 foo' %
request.requestid))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'responsedata')
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674
action, meta = sendframe(
reactor,
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'responsedata')
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 class RedirectTests(unittest.TestCase):
def testredirect(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060
redirect = {
b'targets': [b'a', b'b'],
b'hashes': [b'sha256'],
}
request, action, meta = reactor.callcommand(
b'foo', {}, redirect=redirect)
self.assertEqual(action, b'sendframes')
frames = list(meta[b'framegen'])
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0],
ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'foo', "
b"b'redirect': {b'targets': [b'a', b'b'], "
b"b'hashes': [b'sha256']}}"))
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 class StreamSettingsTests(unittest.TestCase):
def testnoflags(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings 0 '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame must have '
b'continuation or end of stream flag set',
})
def testconflictflags(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings continuation|eos '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame cannot have both '
b'continuation and end of stream flags set',
})
def testemptypayload(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame did not contain '
b'CBOR data'
})
def testbadcbor(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos badvalue'))
self.assertEqual(action, b'error')
def testsingleobject(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
def testmultipleobjects(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
data = b''.join([
b''.join(cborutil.streamencode(b'identity')),
b''.join(cborutil.streamencode({b'foo', b'bar'})),
])
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos %s' % data))
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'error setting stream decoder: identity decoder '
b'received unexpected additional values',
})
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
def testmultipleframes(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
data = b''.join(cborutil.streamencode(b'identity'))
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings continuation %s' %
data[0:3]))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
action, meta = sendframe(reactor,
ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 def testinvalidencoder(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'error setting stream decoder: unknown stream '
b'decoder: badvalue',
})
def testzlibencoding(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zlib.compress(encoded)
self.assertEqual(zlib.decompress(compressed), encoded)
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' %
(request.requestid, compressed)))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], encoded)
def testzlibencodingsinglebyteframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zlib.compress(encoded)
self.assertEqual(zlib.decompress(compressed), encoded)
chunks = []
for i in range(len(compressed)):
char = compressed[i:i + 1]
if char == b'\\':
char = b'\\\\'
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' %
(request.requestid, char)))
self.assertEqual(action, b'responsedata')
chunks.append(meta[b'data'])
self.assertTrue(meta[b'expectmore'])
self.assertFalse(meta[b'eos'])
# zlib will have the full data decoded at this point, even though
# we haven't flushed.
self.assertEqual(b''.join(chunks), encoded)
# End the stream for good measure.
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
self.assertFalse(meta[b'expectmore'])
self.assertTrue(meta[b'eos'])
def testzlibmultipleresponses(self):
# We feed in zlib compressed data on the same stream but belonging to
# 2 different requests. This tests our flushing behavior.
reactor = framing.clientreactor(globalui, buffersends=False,
hasmultiplesend=True)
request1, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
request2, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
outstream = framing.outputstream(2)
outstream.setencoder(globalui, b'zlib')
response1 = b''.join(cborutil.streamencode({
b'status': b'ok',
b'extra': b'response1' * 10,
}))
response2 = b''.join(cborutil.streamencode({
b'status': b'error',
b'extra': b'response2' * 10,
}))
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request1.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
# Feeding partial data in won't get anything useful out.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request1.requestid, outstream.encode(response1))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
# But flushing data at both ends will get our original data.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request1.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response1)
# We should be able to reuse the compressor/decompressor for the
# 2nd response.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request2.requestid, outstream.encode(response2))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request2.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response2)
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbencoding(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
encoder = framing.zstd8mbencoder(globalui)
compressed = encoder.encode(encoded) + encoder.finish()
self.assertEqual(zstd.ZstdDecompressor().decompress(
compressed, max_output_size=len(encoded)), encoded)
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' %
(request.requestid, compressed)))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], encoded)
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbencodingsinglebyteframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zstd.ZstdCompressor().compress(encoded)
self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
encoded)
chunks = []
for i in range(len(compressed)):
char = compressed[i:i + 1]
if char == b'\\':
char = b'\\\\'
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' %
(request.requestid, char)))
self.assertEqual(action, b'responsedata')
chunks.append(meta[b'data'])
self.assertTrue(meta[b'expectmore'])
self.assertFalse(meta[b'eos'])
# zstd decompressor will flush at frame boundaries.
self.assertEqual(b''.join(chunks), encoded)
# End the stream for good measure.
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
self.assertFalse(meta[b'expectmore'])
self.assertTrue(meta[b'eos'])
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbmultipleresponses(self):
# We feed in zstd compressed data on the same stream but belonging to
# 2 different requests. This tests our flushing behavior.
reactor = framing.clientreactor(globalui, buffersends=False,
hasmultiplesend=True)
request1, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
request2, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
outstream = framing.outputstream(2)
outstream.setencoder(globalui, b'zstd-8mb')
response1 = b''.join(cborutil.streamencode({
b'status': b'ok',
b'extra': b'response1' * 10,
}))
response2 = b''.join(cborutil.streamencode({
b'status': b'error',
b'extra': b'response2' * 10,
}))
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request1.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
# Feeding partial data in won't get anything useful out.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request1.requestid, outstream.encode(response1))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
# But flushing data at both ends will get our original data.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request1.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response1)
# We should be able to reuse the compressor/decompressor for the
# 2nd response.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request2.requestid, outstream.encode(response2))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request2.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response2)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)