##// END OF EJS Templates
changegroup: move file matcher from narrow extension...
changegroup: move file matcher from narrow extension Sparse changegroup generation requires the use of a matcher to filter which files are relevant. This commit moves the file matcher from the narrow extension to core and updates the narrow extension to use it. I'm not sure why the narrow extension was storing the matcher as a callable that resolved to a matcher. So I changed it to be a simple matcher instance. In addition, code from narrow to intersect the matcher with the local narrow spec is now performed automatically when the changegroup packer is created. If a matcher is not passed into getbundler() an alwaysmatcher() is assumed. This ensures that a matcher is always defined for all operations. Differential Revision: https://phab.mercurial-scm.org/D4011

File last commit:

r37733:1859b9a7 default
r38818:9c057acb default
Show More
test-wireproto-framing.py
197 lines | 7.1 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-framing.py
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 from __future__ import absolute_import, print_function
import unittest
from mercurial import (
util,
wireprotoframing as framing,
)
ffs = framing.makeframefromhumanstring
class FrameHumanStringTests(unittest.TestCase):
def testbasic(self):
self.assertEqual(ffs(b'1 1 0 1 0 '),
b'\x00\x00\x00\x01\x00\x01\x00\x10')
self.assertEqual(ffs(b'2 4 0 1 0 '),
b'\x00\x00\x00\x02\x00\x04\x00\x10')
self.assertEqual(ffs(b'2 4 0 1 0 foo'),
b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
def testcborint(self):
self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
b'\x00\x10\x00\x00')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
def testcborstrings(self):
self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
def testcborlists(self):
self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
b'\x18*Cfoo')
def testcbordicts(self):
self.assertEqual(ffs(b"1 1 0 1 0 "
b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
b'CbarDval2CfooDval1')
class FrameTests(unittest.TestCase):
def testdataexactframesize(self):
data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command',
{}, data))
self.assertEqual(frames, [
ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"),
ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
ffs(b'1 1 0 command-data eos ')
])
def testdatamultipleframes(self):
data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command', {},
data))
self.assertEqual(frames, [
ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"),
ffs(b'1 1 0 command-data continuation %s' % (
b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
ffs(b'1 1 0 command-data eos x'),
])
def testargsanddata(self):
data = util.bytesio(b'x' * 100)
stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command', {
b'key1': b'key1value',
b'key2': b'key2value',
b'key3': b'key3value',
}, data))
self.assertEqual(frames, [
ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
b"b'key2': b'key2value', b'key3': b'key3value'}}"),
ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
])
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 def testtextoutputformattingstringtype(self):
"""Formatting string must be bytes."""
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 list(framing.createtextoutputframe(None, 1, [
(b'foo'.decode('ascii'), [], [])]))
def testtextoutputargumentbytes(self):
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 list(framing.createtextoutputframe(None, 1, [
(b'foo', [b'foo'.decode('ascii')], [])]))
def testtextoutputlabelbytes(self):
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 list(framing.createtextoutputframe(None, 1, [
(b'foo', [], [b'foo'.decode('ascii')])]))
def testtextoutput1simpleatom(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], [])]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}]"),
])
def testtextoutput2simpleatoms(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], []),
(b'bar', [], []),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
])
def testtextoutput1arg(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s', [b'val1'], []),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
])
def testtextoutput2arg(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s %s', [b'val', b'value'], []),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
])
def testtextoutput1label(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], [b'label']),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
])
def testargandlabel(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s', [b'arg'], [b'label']),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
b"b'labels': [b'label']}]")
])
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)