##// END OF EJS Templates
lfs: handle URLErrors to add additional information...
lfs: handle URLErrors to add additional information Sometimes the blob server is hit first (e.g. on push), and sometimes it's hit last (e.g. pull). Throw in depth first subrepo operations, and things quickly get insane. It wasn't even mentioning LFS, so just saying "connection refused" can be confusing- especially if the blob server is a secondary server and connecting to the repo server works. The exception handler for the transfer handler will print the full path to the blob, but that seems fine given that it might be necessary to debug a second server. (We don't yet support a standalone blob server, so the handler for the Batch API will cover 99.9% of the current problems. But it might as well be handled now while I'm thinking about it.) The function for translating to a message was mostly borrowed from scmutil.catchall().

File last commit:

r40157:73fef626 default
r40697:380f5131 default
Show More
test_train_dictionary.py
88 lines | 2.8 KiB | text/x-python | PythonLexer
import struct
import sys
import unittest
import zstandard as zstd
from . common import (
generate_samples,
make_cffi,
)
if sys.version_info[0] >= 3:
int_type = int
else:
int_type = long
@make_cffi
class TestTrainDictionary(unittest.TestCase):
def test_no_args(self):
with self.assertRaises(TypeError):
zstd.train_dictionary()
def test_bad_args(self):
with self.assertRaises(TypeError):
zstd.train_dictionary(8192, u'foo')
with self.assertRaises(ValueError):
zstd.train_dictionary(8192, [u'foo'])
def test_no_params(self):
d = zstd.train_dictionary(8192, generate_samples())
self.assertIsInstance(d.dict_id(), int_type)
# The dictionary ID may be different across platforms.
expected = b'\x37\xa4\x30\xec' + struct.pack('<I', d.dict_id())
data = d.as_bytes()
self.assertEqual(data[0:8], expected)
def test_basic(self):
d = zstd.train_dictionary(8192, generate_samples(), k=64, d=16)
self.assertIsInstance(d.dict_id(), int_type)
data = d.as_bytes()
self.assertEqual(data[0:4], b'\x37\xa4\x30\xec')
self.assertEqual(d.k, 64)
self.assertEqual(d.d, 16)
def test_set_dict_id(self):
d = zstd.train_dictionary(8192, generate_samples(), k=64, d=16,
dict_id=42)
self.assertEqual(d.dict_id(), 42)
def test_optimize(self):
d = zstd.train_dictionary(8192, generate_samples(), threads=-1, steps=1,
d=16)
# This varies by platform.
self.assertIn(d.k, (50, 2000))
self.assertEqual(d.d, 16)
@make_cffi
class TestCompressionDict(unittest.TestCase):
def test_bad_mode(self):
with self.assertRaisesRegexp(ValueError, 'invalid dictionary load mode'):
zstd.ZstdCompressionDict(b'foo', dict_type=42)
def test_bad_precompute_compress(self):
d = zstd.train_dictionary(8192, generate_samples(), k=64, d=16)
with self.assertRaisesRegexp(ValueError, 'must specify one of level or '):
d.precompute_compress()
with self.assertRaisesRegexp(ValueError, 'must only specify one of level or '):
d.precompute_compress(level=3,
compression_params=zstd.CompressionParameters())
def test_precompute_compress_rawcontent(self):
d = zstd.ZstdCompressionDict(b'dictcontent' * 64,
dict_type=zstd.DICT_TYPE_RAWCONTENT)
d.precompute_compress(level=1)
d = zstd.ZstdCompressionDict(b'dictcontent' * 64,
dict_type=zstd.DICT_TYPE_FULLDICT)
with self.assertRaisesRegexp(zstd.ZstdError, 'unable to precompute dictionary'):
d.precompute_compress(level=1)