##// END OF EJS Templates
rebase: allow aborting if last-message.txt is missing...
rebase: allow aborting if last-message.txt is missing Previously, if .hg/rebasestate existed but .hg/last-message.txt was missing, 'hg rebase --abort' would say there's no rebase in progress but 'hg checkout foo' would say 'abort: rebase in progress'. It turns out loading the collapse message will throw a "no rebase in progress" error if the file doesn't exist, even though .hg/rebasestate obviously indicates a rebase is in progress. The fix is to only throw an exception if we're trying to --continue, and to just eat the issues if we're doing --abort. This issue is exposed by us writing the rebase state earlier in the process. This will be used by later patches to ensure the user can appropriately 'hg rebase --abort' if there's a crash before the first the first commit has finished rebasing. Tests cover all of this. The only negative affect is we now require a hg rebase --abort in a very specific exception case, as shown in the test.

File last commit:

r30895:c32454d6 default
r31225:749b057b default
Show More
test_roundtrip.py
68 lines | 2.2 KiB | text/x-python | PythonLexer
import io
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
raise unittest.SkipTest('hypothesis not available')
import zstd
from .common import (
make_cffi,
)
compression_levels = strategies.integers(min_value=1, max_value=22)
@make_cffi
class TestRoundTrip(unittest.TestCase):
@hypothesis.given(strategies.binary(), compression_levels)
def test_compress_write_to(self, data, level):
"""Random data from compress() roundtrips via write_to."""
cctx = zstd.ZstdCompressor(level=level)
compressed = cctx.compress(data)
buffer = io.BytesIO()
dctx = zstd.ZstdDecompressor()
with dctx.write_to(buffer) as decompressor:
decompressor.write(compressed)
self.assertEqual(buffer.getvalue(), data)
@hypothesis.given(strategies.binary(), compression_levels)
def test_compressor_write_to_decompressor_write_to(self, data, level):
"""Random data from compressor write_to roundtrips via write_to."""
compress_buffer = io.BytesIO()
decompressed_buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=level)
with cctx.write_to(compress_buffer) as compressor:
compressor.write(data)
dctx = zstd.ZstdDecompressor()
with dctx.write_to(decompressed_buffer) as decompressor:
decompressor.write(compress_buffer.getvalue())
self.assertEqual(decompressed_buffer.getvalue(), data)
@hypothesis.given(strategies.binary(average_size=1048576))
@hypothesis.settings(perform_health_check=False)
def test_compressor_write_to_decompressor_write_to_larger(self, data):
compress_buffer = io.BytesIO()
decompressed_buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=5)
with cctx.write_to(compress_buffer) as compressor:
compressor.write(data)
dctx = zstd.ZstdDecompressor()
with dctx.write_to(decompressed_buffer) as decompressor:
decompressor.write(compress_buffer.getvalue())
self.assertEqual(decompressed_buffer.getvalue(), data)