##// END OF EJS Templates
tests: configure fsmonitor.mode=paranoid always if fsmonitor is used...
tests: configure fsmonitor.mode=paranoid always if fsmonitor is used This forces fsmonitor extension execute "paranoid" code path. Strict speaking, we should make fsmonitor-run-tests.py accept own specific options, but there is no code path, which is disabled in "paranoid" mode, at least now. Therefore, this solution seems reasonable enough.

File last commit:

r40167:e6752241 default
r40244:b7ba1cfb default
Show More
test-wireproto-clientreactor.py
604 lines | 20.8 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-clientreactor.py
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 from __future__ import absolute_import
import unittest
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 import zlib
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
from mercurial import (
error,
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 ui as uimod,
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 wireprotoframing as framing,
)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 from mercurial.utils import (
cborutil,
)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 try:
from mercurial import zstd
zstd.__version__
except ImportError:
zstd = None
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 ffs = framing.makeframefromhumanstring
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 globalui = uimod.ui()
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 def sendframe(reactor, frame):
"""Send a frame bytearray to a reactor."""
header = framing.parseheader(frame)
payload = frame[framing.FRAME_HEADER_SIZE:]
assert len(payload) == header.length
return reactor.onframerecv(framing.frame(header.requestid,
header.streamid,
header.streamflags,
header.typeid,
header.flags,
payload))
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 class SingleSendTests(unittest.TestCase):
"""A reactor that can only send once rejects subsequent sends."""
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 def testbasic(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui,
hasmultiplesend=False,
buffersends=True)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
request, action, meta = reactor.callcommand(b'foo', {})
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'pending')
self.assertEqual(action, b'noop')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
action, meta = reactor.flushcommands()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'sent')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(error.ProgrammingError,
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 'cannot issue new commands'):
reactor.callcommand(b'foo', {})
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(error.ProgrammingError,
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 'cannot issue new commands'):
reactor.callcommand(b'foo', {})
class NoBufferTests(unittest.TestCase):
"""A reactor without send buffering sends requests immediately."""
def testbasic(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui,
hasmultiplesend=True,
buffersends=False)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
request, action, meta = reactor.callcommand(b'command1', {})
self.assertEqual(request.requestid, 1)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'pending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'sent')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
action, meta = reactor.flushcommands()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'noop')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
# And we can send another command.
request, action, meta = reactor.callcommand(b'command2', {})
self.assertEqual(request.requestid, 3)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(request.state, b'sent')
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 class BadFrameRecvTests(unittest.TestCase):
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 def testoddstream(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'error')
self.assertEqual(meta[b'message'],
b'received frame with odd numbered stream ID: 1')
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
def testunknownstream(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'error')
self.assertEqual(meta[b'message'],
b'received frame on unknown stream without beginning '
b'of stream flag set')
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
def testunhandledframetype(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562
request, action, meta = reactor.callcommand(b'foo', {})
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 for frame in meta[b'framegen']:
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 pass
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(error.ProgrammingError,
Gregory Szorc
wireproto: client reactor support for receiving frames...
r37562 'unhandled frame type'):
sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674 class StreamTests(unittest.TestCase):
def testmultipleresponseframes(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674
request, action, meta = reactor.callcommand(b'foo', {})
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'sendframes')
for f in meta[b'framegen']:
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674 pass
action, meta = sendframe(
reactor,
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 ffs(b'%d 0 stream-begin command-response 0 foo' %
request.requestid))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'responsedata')
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674
action, meta = sendframe(
reactor,
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertEqual(action, b'responsedata')
Gregory Szorc
wireprotoframing: record when new stream is encountered...
r37674
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 class RedirectTests(unittest.TestCase):
def testredirect(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060
redirect = {
b'targets': [b'a', b'b'],
b'hashes': [b'sha256'],
}
request, action, meta = reactor.callcommand(
b'foo', {}, redirect=redirect)
self.assertEqual(action, b'sendframes')
frames = list(meta[b'framegen'])
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0],
ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'foo', "
b"b'redirect': {b'targets': [b'a', b'b'], "
b"b'hashes': [b'sha256']}}"))
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164 class StreamSettingsTests(unittest.TestCase):
def testnoflags(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings 0 '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame must have '
b'continuation or end of stream flag set',
})
def testconflictflags(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings continuation|eos '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame cannot have both '
b'continuation and end of stream flags set',
})
def testemptypayload(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame did not contain '
b'CBOR data'
})
def testbadcbor(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos badvalue'))
self.assertEqual(action, b'error')
def testsingleobject(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
def testmultipleobjects(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
data = b''.join([
b''.join(cborutil.streamencode(b'identity')),
b''.join(cborutil.streamencode({b'foo', b'bar'})),
])
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos %s' % data))
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'error setting stream decoder: identity decoder '
b'received unexpected additional values',
})
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
def testmultipleframes(self):
Gregory Szorc
wireprotov2: pass ui into clientreactor and serverreactor...
r40165 reactor = framing.clientreactor(globalui, buffersends=False)
Gregory Szorc
wireprotov2: handle stream encoding settings frames...
r40164
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
data = b''.join(cborutil.streamencode(b'identity'))
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings continuation %s' %
data[0:3]))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
action, meta = sendframe(reactor,
ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
Gregory Szorc
wireprotov2: define and use stream encoders...
r40167 def testinvalidencoder(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'error setting stream decoder: unknown stream '
b'decoder: badvalue',
})
def testzlibencoding(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zlib.compress(encoded)
self.assertEqual(zlib.decompress(compressed), encoded)
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' %
(request.requestid, compressed)))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], encoded)
def testzlibencodingsinglebyteframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zlib.compress(encoded)
self.assertEqual(zlib.decompress(compressed), encoded)
chunks = []
for i in range(len(compressed)):
char = compressed[i:i + 1]
if char == b'\\':
char = b'\\\\'
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' %
(request.requestid, char)))
self.assertEqual(action, b'responsedata')
chunks.append(meta[b'data'])
self.assertTrue(meta[b'expectmore'])
self.assertFalse(meta[b'eos'])
# zlib will have the full data decoded at this point, even though
# we haven't flushed.
self.assertEqual(b''.join(chunks), encoded)
# End the stream for good measure.
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
self.assertFalse(meta[b'expectmore'])
self.assertTrue(meta[b'eos'])
def testzlibmultipleresponses(self):
# We feed in zlib compressed data on the same stream but belonging to
# 2 different requests. This tests our flushing behavior.
reactor = framing.clientreactor(globalui, buffersends=False,
hasmultiplesend=True)
request1, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
request2, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
outstream = framing.outputstream(2)
outstream.setencoder(globalui, b'zlib')
response1 = b''.join(cborutil.streamencode({
b'status': b'ok',
b'extra': b'response1' * 10,
}))
response2 = b''.join(cborutil.streamencode({
b'status': b'error',
b'extra': b'response2' * 10,
}))
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request1.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
# Feeding partial data in won't get anything useful out.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request1.requestid, outstream.encode(response1))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
# But flushing data at both ends will get our original data.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request1.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response1)
# We should be able to reuse the compressor/decompressor for the
# 2nd response.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request2.requestid, outstream.encode(response2))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request2.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response2)
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbencoding(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
encoder = framing.zstd8mbencoder(globalui)
compressed = encoder.encode(encoded) + encoder.finish()
self.assertEqual(zstd.ZstdDecompressor().decompress(
compressed, max_output_size=len(encoded)), encoded)
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' %
(request.requestid, compressed)))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], encoded)
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbencodingsinglebyteframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zstd.ZstdCompressor().compress(encoded)
self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
encoded)
chunks = []
for i in range(len(compressed)):
char = compressed[i:i + 1]
if char == b'\\':
char = b'\\\\'
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' %
(request.requestid, char)))
self.assertEqual(action, b'responsedata')
chunks.append(meta[b'data'])
self.assertTrue(meta[b'expectmore'])
self.assertFalse(meta[b'eos'])
# zstd decompressor will flush at frame boundaries.
self.assertEqual(b''.join(chunks), encoded)
# End the stream for good measure.
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
self.assertFalse(meta[b'expectmore'])
self.assertTrue(meta[b'eos'])
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbmultipleresponses(self):
# We feed in zstd compressed data on the same stream but belonging to
# 2 different requests. This tests our flushing behavior.
reactor = framing.clientreactor(globalui, buffersends=False,
hasmultiplesend=True)
request1, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
request2, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
outstream = framing.outputstream(2)
outstream.setencoder(globalui, b'zstd-8mb')
response1 = b''.join(cborutil.streamencode({
b'status': b'ok',
b'extra': b'response1' * 10,
}))
response2 = b''.join(cborutil.streamencode({
b'status': b'error',
b'extra': b'response2' * 10,
}))
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request1.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
# Feeding partial data in won't get anything useful out.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request1.requestid, outstream.encode(response1))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
# But flushing data at both ends will get our original data.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request1.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response1)
# We should be able to reuse the compressor/decompressor for the
# 2nd response.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request2.requestid, outstream.encode(response2))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request2.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response2)
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561 if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)