##// END OF EJS Templates
localrepo: use context manager for lock and transaction in commitctx()...
localrepo: use context manager for lock and transaction in commitctx() Differential Revision: https://phab.mercurial-scm.org/D5624

File last commit:

r41154:685cf59a default
r41400:0132221c default
Show More
test-wireproto-clientreactor.py
610 lines | 21.0 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-clientreactor.py
from __future__ import absolute_import
import sys
import unittest
import zlib
from mercurial import (
error,
ui as uimod,
wireprotoframing as framing,
)
from mercurial.utils import (
cborutil,
)
try:
from mercurial import zstd
zstd.__version__
except ImportError:
zstd = None
ffs = framing.makeframefromhumanstring
globalui = uimod.ui()
def sendframe(reactor, frame):
"""Send a frame bytearray to a reactor."""
header = framing.parseheader(frame)
payload = frame[framing.FRAME_HEADER_SIZE:]
assert len(payload) == header.length
return reactor.onframerecv(framing.frame(header.requestid,
header.streamid,
header.streamflags,
header.typeid,
header.flags,
payload))
class SingleSendTests(unittest.TestCase):
"""A reactor that can only send once rejects subsequent sends."""
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
def testbasic(self):
reactor = framing.clientreactor(globalui,
hasmultiplesend=False,
buffersends=True)
request, action, meta = reactor.callcommand(b'foo', {})
self.assertEqual(request.state, b'pending')
self.assertEqual(action, b'noop')
action, meta = reactor.flushcommands()
self.assertEqual(action, b'sendframes')
for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
self.assertEqual(request.state, b'sent')
with self.assertRaisesRegex(error.ProgrammingError,
'cannot issue new commands'):
reactor.callcommand(b'foo', {})
with self.assertRaisesRegex(error.ProgrammingError,
'cannot issue new commands'):
reactor.callcommand(b'foo', {})
class NoBufferTests(unittest.TestCase):
"""A reactor without send buffering sends requests immediately."""
def testbasic(self):
reactor = framing.clientreactor(globalui,
hasmultiplesend=True,
buffersends=False)
request, action, meta = reactor.callcommand(b'command1', {})
self.assertEqual(request.requestid, 1)
self.assertEqual(action, b'sendframes')
self.assertEqual(request.state, b'pending')
for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
self.assertEqual(request.state, b'sent')
action, meta = reactor.flushcommands()
self.assertEqual(action, b'noop')
# And we can send another command.
request, action, meta = reactor.callcommand(b'command2', {})
self.assertEqual(request.requestid, 3)
self.assertEqual(action, b'sendframes')
for frame in meta[b'framegen']:
self.assertEqual(request.state, b'sending')
self.assertEqual(request.state, b'sent')
class BadFrameRecvTests(unittest.TestCase):
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
def testoddstream(self):
reactor = framing.clientreactor(globalui)
action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
self.assertEqual(action, b'error')
self.assertEqual(meta[b'message'],
b'received frame with odd numbered stream ID: 1')
def testunknownstream(self):
reactor = framing.clientreactor(globalui)
action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
self.assertEqual(action, b'error')
self.assertEqual(meta[b'message'],
b'received frame on unknown stream without beginning '
b'of stream flag set')
def testunhandledframetype(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for frame in meta[b'framegen']:
pass
with self.assertRaisesRegex(error.ProgrammingError,
'unhandled frame type'):
sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
class StreamTests(unittest.TestCase):
def testmultipleresponseframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
self.assertEqual(action, b'sendframes')
for f in meta[b'framegen']:
pass
action, meta = sendframe(
reactor,
ffs(b'%d 0 stream-begin command-response 0 foo' %
request.requestid))
self.assertEqual(action, b'responsedata')
action, meta = sendframe(
reactor,
ffs(b'%d 0 0 command-response eos bar' % request.requestid))
self.assertEqual(action, b'responsedata')
class RedirectTests(unittest.TestCase):
def testredirect(self):
reactor = framing.clientreactor(globalui, buffersends=False)
redirect = {
b'targets': [b'a', b'b'],
b'hashes': [b'sha256'],
}
request, action, meta = reactor.callcommand(
b'foo', {}, redirect=redirect)
self.assertEqual(action, b'sendframes')
frames = list(meta[b'framegen'])
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0],
ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'foo', "
b"b'redirect': {b'targets': [b'a', b'b'], "
b"b'hashes': [b'sha256']}}"))
class StreamSettingsTests(unittest.TestCase):
def testnoflags(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings 0 '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame must have '
b'continuation or end of stream flag set',
})
def testconflictflags(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings continuation|eos '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame cannot have both '
b'continuation and end of stream flags set',
})
def testemptypayload(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos '))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'stream encoding settings frame did not contain '
b'CBOR data'
})
def testbadcbor(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos badvalue'))
self.assertEqual(action, b'error')
def testsingleobject(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
def testmultipleobjects(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
data = b''.join([
b''.join(cborutil.streamencode(b'identity')),
b''.join(cborutil.streamencode({b'foo', b'bar'})),
])
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos %s' % data))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'error setting stream decoder: identity decoder '
b'received unexpected additional values',
})
def testmultipleframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
data = b''.join(cborutil.streamencode(b'identity'))
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings continuation %s' %
data[0:3]))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
action, meta = sendframe(reactor,
ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
def testinvalidencoder(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
self.assertEqual(action, b'error')
self.assertEqual(meta, {
b'message': b'error setting stream decoder: unknown stream '
b'decoder: badvalue',
})
def testzlibencoding(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zlib.compress(encoded)
self.assertEqual(zlib.decompress(compressed), encoded)
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' %
(request.requestid, compressed)))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], encoded)
def testzlibencodingsinglebyteframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zlib.compress(encoded)
self.assertEqual(zlib.decompress(compressed), encoded)
chunks = []
for i in range(len(compressed)):
char = compressed[i:i + 1]
if char == b'\\':
char = b'\\\\'
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' %
(request.requestid, char)))
self.assertEqual(action, b'responsedata')
chunks.append(meta[b'data'])
self.assertTrue(meta[b'expectmore'])
self.assertFalse(meta[b'eos'])
# zlib will have the full data decoded at this point, even though
# we haven't flushed.
self.assertEqual(b''.join(chunks), encoded)
# End the stream for good measure.
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
self.assertFalse(meta[b'expectmore'])
self.assertTrue(meta[b'eos'])
def testzlibmultipleresponses(self):
# We feed in zlib compressed data on the same stream but belonging to
# 2 different requests. This tests our flushing behavior.
reactor = framing.clientreactor(globalui, buffersends=False,
hasmultiplesend=True)
request1, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
request2, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
outstream = framing.outputstream(2)
outstream.setencoder(globalui, b'zlib')
response1 = b''.join(cborutil.streamencode({
b'status': b'ok',
b'extra': b'response1' * 10,
}))
response2 = b''.join(cborutil.streamencode({
b'status': b'error',
b'extra': b'response2' * 10,
}))
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
request1.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
# Feeding partial data in won't get anything useful out.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request1.requestid, outstream.encode(response1))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
# But flushing data at both ends will get our original data.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request1.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response1)
# We should be able to reuse the compressor/decompressor for the
# 2nd response.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request2.requestid, outstream.encode(response2))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request2.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response2)
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbencoding(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
encoder = framing.zstd8mbencoder(globalui)
compressed = encoder.encode(encoded) + encoder.finish()
self.assertEqual(zstd.ZstdDecompressor().decompress(
compressed, max_output_size=len(encoded)), encoded)
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' %
(request.requestid, compressed)))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], encoded)
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbencodingsinglebyteframes(self):
reactor = framing.clientreactor(globalui, buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
result = {
b'status': b'ok',
}
encoded = b''.join(cborutil.streamencode(result))
compressed = zstd.ZstdCompressor().compress(encoded)
self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
encoded)
chunks = []
for i in range(len(compressed)):
char = compressed[i:i + 1]
if char == b'\\':
char = b'\\\\'
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' %
(request.requestid, char)))
self.assertEqual(action, b'responsedata')
chunks.append(meta[b'data'])
self.assertTrue(meta[b'expectmore'])
self.assertFalse(meta[b'eos'])
# zstd decompressor will flush at frame boundaries.
self.assertEqual(b''.join(chunks), encoded)
# End the stream for good measure.
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
self.assertFalse(meta[b'expectmore'])
self.assertTrue(meta[b'eos'])
@unittest.skipUnless(zstd, 'zstd not available')
def testzstd8mbmultipleresponses(self):
# We feed in zstd compressed data on the same stream but belonging to
# 2 different requests. This tests our flushing behavior.
reactor = framing.clientreactor(globalui, buffersends=False,
hasmultiplesend=True)
request1, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
request2, action, meta = reactor.callcommand(b'foo', {})
for f in meta[b'framegen']:
pass
outstream = framing.outputstream(2)
outstream.setencoder(globalui, b'zstd-8mb')
response1 = b''.join(cborutil.streamencode({
b'status': b'ok',
b'extra': b'response1' * 10,
}))
response2 = b''.join(cborutil.streamencode({
b'status': b'error',
b'extra': b'response2' * 10,
}))
action, meta = sendframe(reactor,
ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
request1.requestid))
self.assertEqual(action, b'noop')
self.assertEqual(meta, {})
# Feeding partial data in won't get anything useful out.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request1.requestid, outstream.encode(response1))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
# But flushing data at both ends will get our original data.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request1.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response1)
# We should be able to reuse the compressor/decompressor for the
# 2nd response.
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response continuation %s' % (
request2.requestid, outstream.encode(response2))))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], b'')
action, meta = sendframe(reactor,
ffs(b'%d 2 encoded command-response eos %s' % (
request2.requestid, outstream.flush())))
self.assertEqual(action, b'responsedata')
self.assertEqual(meta[b'data'], response2)
if __name__ == '__main__':
if (3, 6, 0) <= sys.version_info < (3, 6, 4):
# Python 3.6.0 through 3.6.3 inclusive shipped with
# https://bugs.python.org/issue31825 and we can't run these
# tests on those specific versions of Python. Sigh.
sys.exit(80)
import silenttestrunner
silenttestrunner.main(__name__)