##// END OF EJS Templates
copies: add config option for writing copy metadata to file and/or changset...
copies: add config option for writing copy metadata to file and/or changset This introduces a config option that lets you choose to write copy metadata to the changeset extras instead of to filelog. There's also an option to write it to both places. I imagine that may possibly be useful when transitioning an existing repo. The copy metadata is stored as two fields in extras: one for copies since p1 and one for copies since p2. I may need to add more information later in order to make copy tracing faster. Specifically, I'm thinking out recording which files were added or removed so that copies._chaincopies() doesn't have to look at the manifest for that. But that would just be an optimization and that can be added once we know if it's necessary. I have also considered saving space by using replacing the destination file path by an index into the "files" list, but that can also be changed later (but before the feature is ready to release). Differential Revision: https://phab.mercurial-scm.org/D6183

File last commit:

r39447:fcc6bd11 default
r42317:0e41f40b default
Show More
test-wireproto-framing.py
194 lines | 6.9 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-framing.py
from __future__ import absolute_import, print_function
import unittest
from mercurial import (
util,
wireprotoframing as framing,
)
ffs = framing.makeframefromhumanstring
class FrameHumanStringTests(unittest.TestCase):
def testbasic(self):
self.assertEqual(ffs(b'1 1 0 1 0 '),
b'\x00\x00\x00\x01\x00\x01\x00\x10')
self.assertEqual(ffs(b'2 4 0 1 0 '),
b'\x00\x00\x00\x02\x00\x04\x00\x10')
self.assertEqual(ffs(b'2 4 0 1 0 foo'),
b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
def testcborint(self):
self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
b'\x00\x10\x00\x00')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
def testcborstrings(self):
self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
def testcborlists(self):
self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
b'\x18*Cfoo')
def testcbordicts(self):
self.assertEqual(ffs(b"1 1 0 1 0 "
b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
b'CbarDval2CfooDval1')
class FrameTests(unittest.TestCase):
def testdataexactframesize(self):
data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command',
{}, data))
self.assertEqual(frames, [
ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"),
ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
ffs(b'1 1 0 command-data eos ')
])
def testdatamultipleframes(self):
data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command', {},
data))
self.assertEqual(frames, [
ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"),
ffs(b'1 1 0 command-data continuation %s' % (
b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
ffs(b'1 1 0 command-data eos x'),
])
def testargsanddata(self):
data = util.bytesio(b'x' * 100)
stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command', {
b'key1': b'key1value',
b'key2': b'key2value',
b'key3': b'key3value',
}, data))
self.assertEqual(frames, [
ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
b"b'key2': b'key2value', b'key3': b'key3value'}}"),
ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
])
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
def testtextoutputformattingstringtype(self):
"""Formatting string must be bytes."""
with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
list(framing.createtextoutputframe(None, 1, [
(b'foo'.decode('ascii'), [], [])]))
def testtextoutputargumentbytes(self):
with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
list(framing.createtextoutputframe(None, 1, [
(b'foo', [b'foo'.decode('ascii')], [])]))
def testtextoutputlabelbytes(self):
with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
list(framing.createtextoutputframe(None, 1, [
(b'foo', [], [b'foo'.decode('ascii')])]))
def testtextoutput1simpleatom(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], [])]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}]"),
])
def testtextoutput2simpleatoms(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], []),
(b'bar', [], []),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
])
def testtextoutput1arg(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s', [b'val1'], []),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
])
def testtextoutput2arg(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s %s', [b'val', b'value'], []),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
])
def testtextoutput1label(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], [b'label']),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
])
def testargandlabel(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s', [b'arg'], [b'label']),
]))
self.assertEqual(val, [
ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
b"b'labels': [b'label']}]")
])
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)