##// END OF EJS Templates
tests: use context manager form of assertRaises...
Gregory Szorc -
r32279:68c43a41 default
parent child Browse files
Show More
@@ -1,119 +1,120 b''
1 1 from __future__ import absolute_import
2 2
3 3 import glob
4 4 import os
5 5 import shutil
6 6 import tempfile
7 7 import unittest
8 8
9 9 from mercurial import (
10 10 util,
11 11 )
12 12 atomictempfile = util.atomictempfile
13 13
14 14 class testatomictempfile(unittest.TestCase):
15 15 def setUp(self):
16 16 self._testdir = tempfile.mkdtemp('atomictempfiletest')
17 17 self._filename = os.path.join(self._testdir, 'testfilename')
18 18
19 19 def tearDown(self):
20 20 shutil.rmtree(self._testdir, True)
21 21
22 22 def testsimple(self):
23 23 file = atomictempfile(self._filename)
24 24 self.assertFalse(os.path.isfile(self._filename))
25 25 tempfilename = file._tempname
26 26 self.assertTrue(tempfilename in glob.glob(
27 27 os.path.join(self._testdir, '.testfilename-*')))
28 28
29 29 file.write(b'argh\n')
30 30 file.close()
31 31
32 32 self.assertTrue(os.path.isfile(self._filename))
33 33 self.assertTrue(tempfilename not in glob.glob(
34 34 os.path.join(self._testdir, '.testfilename-*')))
35 35
36 36 # discard() removes the temp file without making the write permanent
37 37 def testdiscard(self):
38 38 file = atomictempfile(self._filename)
39 39 (dir, basename) = os.path.split(file._tempname)
40 40
41 41 file.write(b'yo\n')
42 42 file.discard()
43 43
44 44 self.assertFalse(os.path.isfile(self._filename))
45 45 self.assertTrue(basename not in os.listdir('.'))
46 46
47 47 # if a programmer screws up and passes bad args to atomictempfile, they
48 48 # get a plain ordinary TypeError, not infinite recursion
49 49 def testoops(self):
50 self.assertRaises(TypeError, atomictempfile)
50 with self.assertRaises(TypeError):
51 atomictempfile()
51 52
52 53 # checkambig=True avoids ambiguity of timestamp
53 54 def testcheckambig(self):
54 55 def atomicwrite(checkambig):
55 56 f = atomictempfile(self._filename, checkambig=checkambig)
56 57 f.write('FOO')
57 58 f.close()
58 59
59 60 # try some times, because reproduction of ambiguity depends on
60 61 # "filesystem time"
61 62 for i in xrange(5):
62 63 atomicwrite(False)
63 64 oldstat = os.stat(self._filename)
64 65 if oldstat.st_ctime != oldstat.st_mtime:
65 66 # subsequent changing never causes ambiguity
66 67 continue
67 68
68 69 repetition = 3
69 70
70 71 # repeat atomic write with checkambig=True, to examine
71 72 # whether st_mtime is advanced multiple times as expected
72 73 for j in xrange(repetition):
73 74 atomicwrite(True)
74 75 newstat = os.stat(self._filename)
75 76 if oldstat.st_ctime != newstat.st_ctime:
76 77 # timestamp ambiguity was naturally avoided while repetition
77 78 continue
78 79
79 80 # st_mtime should be advanced "repetition" times, because
80 81 # all atomicwrite() occurred at same time (in sec)
81 82 self.assertTrue(newstat.st_mtime ==
82 83 ((oldstat.st_mtime + repetition) & 0x7fffffff))
83 84 # no more examination is needed, if assumption above is true
84 85 break
85 86 else:
86 87 # This platform seems too slow to examine anti-ambiguity
87 88 # of file timestamp (or test happened to be executed at
88 89 # bad timing). Exit silently in this case, because running
89 90 # on other faster platforms can detect problems
90 91 pass
91 92
92 93 def testread(self):
93 94 with open(self._filename, 'wb') as f:
94 95 f.write(b'foobar\n')
95 96 file = atomictempfile(self._filename, mode='rb')
96 97 self.assertTrue(file.read(), b'foobar\n')
97 98 file.discard()
98 99
99 100 def testcontextmanagersuccess(self):
100 101 """When the context closes, the file is closed"""
101 102 with atomictempfile('foo') as f:
102 103 self.assertFalse(os.path.isfile('foo'))
103 104 f.write(b'argh\n')
104 105 self.assertTrue(os.path.isfile('foo'))
105 106
106 107 def testcontextmanagerfailure(self):
107 108 """On exception, the file is discarded"""
108 109 try:
109 110 with atomictempfile('foo') as f:
110 111 self.assertFalse(os.path.isfile('foo'))
111 112 f.write(b'argh\n')
112 113 raise ValueError
113 114 except ValueError:
114 115 pass
115 116 self.assertFalse(os.path.isfile('foo'))
116 117
117 118 if __name__ == '__main__':
118 119 import silenttestrunner
119 120 silenttestrunner.main(__name__)
@@ -1,79 +1,77 b''
1 1 from __future__ import absolute_import
2 2
3 3 import silenttestrunner
4 4 import unittest
5 5
6 6 from mercurial import util
7 7
8 8 class contextmanager(object):
9 9 def __init__(self, name, trace):
10 10 self.name = name
11 11 self.entered = False
12 12 self.exited = False
13 13 self.trace = trace
14 14
15 15 def __enter__(self):
16 16 self.entered = True
17 17 self.trace(('enter', self.name))
18 18 return self
19 19
20 20 def __exit__(self, exc_type, exc_val, exc_tb):
21 21 self.exited = exc_type, exc_val, exc_tb
22 22 self.trace(('exit', self.name))
23 23
24 24 def __repr__(self):
25 25 return '<ctx %r>' % self.name
26 26
27 27 class ctxerror(Exception):
28 28 pass
29 29
30 30 class raise_on_enter(contextmanager):
31 31 def __enter__(self):
32 32 self.trace(('raise', self.name))
33 33 raise ctxerror(self.name)
34 34
35 35 class raise_on_exit(contextmanager):
36 36 def __exit__(self, exc_type, exc_val, exc_tb):
37 37 self.trace(('raise', self.name))
38 38 raise ctxerror(self.name)
39 39
40 40 def ctxmgr(name, trace):
41 41 return lambda: contextmanager(name, trace)
42 42
43 43 class test_ctxmanager(unittest.TestCase):
44 44 def test_basics(self):
45 45 trace = []
46 46 addtrace = trace.append
47 47 with util.ctxmanager(ctxmgr('a', addtrace), ctxmgr('b', addtrace)) as c:
48 48 a, b = c.enter()
49 49 c.atexit(addtrace, ('atexit', 'x'))
50 50 c.atexit(addtrace, ('atexit', 'y'))
51 51 self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'),
52 52 ('atexit', 'y'), ('atexit', 'x'),
53 53 ('exit', 'b'), ('exit', 'a')])
54 54
55 55 def test_raise_on_enter(self):
56 56 trace = []
57 57 addtrace = trace.append
58 def go():
58 with self.assertRaises(ctxerror):
59 59 with util.ctxmanager(ctxmgr('a', addtrace),
60 60 lambda: raise_on_enter('b', addtrace)) as c:
61 61 c.enter()
62 62 addtrace('unreachable')
63 self.assertRaises(ctxerror, go)
64 63 self.assertEqual(trace, [('enter', 'a'), ('raise', 'b'), ('exit', 'a')])
65 64
66 65 def test_raise_on_exit(self):
67 66 trace = []
68 67 addtrace = trace.append
69 def go():
68 with self.assertRaises(ctxerror):
70 69 with util.ctxmanager(ctxmgr('a', addtrace),
71 70 lambda: raise_on_exit('b', addtrace)) as c:
72 71 c.enter()
73 72 addtrace('running')
74 self.assertRaises(ctxerror, go)
75 73 self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'), 'running',
76 74 ('raise', 'b'), ('exit', 'a')])
77 75
78 76 if __name__ == '__main__':
79 77 silenttestrunner.main(__name__)
@@ -1,298 +1,296 b''
1 1 from __future__ import absolute_import
2 2
3 3 import copy
4 4 import errno
5 5 import os
6 6 import silenttestrunner
7 7 import tempfile
8 8 import types
9 9 import unittest
10 10
11 11 from mercurial import (
12 12 error,
13 13 lock,
14 14 vfs as vfsmod,
15 15 )
16 16
17 17 testlockname = 'testlock'
18 18
19 19 # work around http://bugs.python.org/issue1515
20 20 if types.MethodType not in copy._deepcopy_dispatch:
21 21 def _deepcopy_method(x, memo):
22 22 return type(x)(x.im_func, copy.deepcopy(x.im_self, memo), x.im_class)
23 23 copy._deepcopy_dispatch[types.MethodType] = _deepcopy_method
24 24
25 25 class lockwrapper(lock.lock):
26 26 def __init__(self, pidoffset, *args, **kwargs):
27 27 # lock.lock.__init__() calls lock(), so the pidoffset assignment needs
28 28 # to be earlier
29 29 self._pidoffset = pidoffset
30 30 super(lockwrapper, self).__init__(*args, **kwargs)
31 31 def _getpid(self):
32 32 return super(lockwrapper, self)._getpid() + self._pidoffset
33 33
34 34 class teststate(object):
35 35 def __init__(self, testcase, dir, pidoffset=0):
36 36 self._testcase = testcase
37 37 self._acquirecalled = False
38 38 self._releasecalled = False
39 39 self._postreleasecalled = False
40 40 self.vfs = vfsmod.vfs(dir, audit=False)
41 41 self._pidoffset = pidoffset
42 42
43 43 def makelock(self, *args, **kwargs):
44 44 l = lockwrapper(self._pidoffset, self.vfs, testlockname,
45 45 releasefn=self.releasefn, acquirefn=self.acquirefn,
46 46 *args, **kwargs)
47 47 l.postrelease.append(self.postreleasefn)
48 48 return l
49 49
50 50 def acquirefn(self):
51 51 self._acquirecalled = True
52 52
53 53 def releasefn(self):
54 54 self._releasecalled = True
55 55
56 56 def postreleasefn(self):
57 57 self._postreleasecalled = True
58 58
59 59 def assertacquirecalled(self, called):
60 60 self._testcase.assertEqual(
61 61 self._acquirecalled, called,
62 62 'expected acquire to be %s but was actually %s' % (
63 63 self._tocalled(called),
64 64 self._tocalled(self._acquirecalled),
65 65 ))
66 66
67 67 def resetacquirefn(self):
68 68 self._acquirecalled = False
69 69
70 70 def assertreleasecalled(self, called):
71 71 self._testcase.assertEqual(
72 72 self._releasecalled, called,
73 73 'expected release to be %s but was actually %s' % (
74 74 self._tocalled(called),
75 75 self._tocalled(self._releasecalled),
76 76 ))
77 77
78 78 def assertpostreleasecalled(self, called):
79 79 self._testcase.assertEqual(
80 80 self._postreleasecalled, called,
81 81 'expected postrelease to be %s but was actually %s' % (
82 82 self._tocalled(called),
83 83 self._tocalled(self._postreleasecalled),
84 84 ))
85 85
86 86 def assertlockexists(self, exists):
87 87 actual = self.vfs.lexists(testlockname)
88 88 self._testcase.assertEqual(
89 89 actual, exists,
90 90 'expected lock to %s but actually did %s' % (
91 91 self._toexists(exists),
92 92 self._toexists(actual),
93 93 ))
94 94
95 95 def _tocalled(self, called):
96 96 if called:
97 97 return 'called'
98 98 else:
99 99 return 'not called'
100 100
101 101 def _toexists(self, exists):
102 102 if exists:
103 103 return 'exist'
104 104 else:
105 105 return 'not exist'
106 106
107 107 class testlock(unittest.TestCase):
108 108 def testlock(self):
109 109 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
110 110 lock = state.makelock()
111 111 state.assertacquirecalled(True)
112 112 lock.release()
113 113 state.assertreleasecalled(True)
114 114 state.assertpostreleasecalled(True)
115 115 state.assertlockexists(False)
116 116
117 117 def testrecursivelock(self):
118 118 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
119 119 lock = state.makelock()
120 120 state.assertacquirecalled(True)
121 121
122 122 state.resetacquirefn()
123 123 lock.lock()
124 124 # recursive lock should not call acquirefn again
125 125 state.assertacquirecalled(False)
126 126
127 127 lock.release() # brings lock refcount down from 2 to 1
128 128 state.assertreleasecalled(False)
129 129 state.assertpostreleasecalled(False)
130 130 state.assertlockexists(True)
131 131
132 132 lock.release() # releases the lock
133 133 state.assertreleasecalled(True)
134 134 state.assertpostreleasecalled(True)
135 135 state.assertlockexists(False)
136 136
137 137 def testlockfork(self):
138 138 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
139 139 lock = state.makelock()
140 140 state.assertacquirecalled(True)
141 141
142 142 # fake a fork
143 143 forklock = copy.deepcopy(lock)
144 144 forklock._pidoffset = 1
145 145 forklock.release()
146 146 state.assertreleasecalled(False)
147 147 state.assertpostreleasecalled(False)
148 148 state.assertlockexists(True)
149 149
150 150 # release the actual lock
151 151 lock.release()
152 152 state.assertreleasecalled(True)
153 153 state.assertpostreleasecalled(True)
154 154 state.assertlockexists(False)
155 155
156 156 def testinheritlock(self):
157 157 d = tempfile.mkdtemp(dir=os.getcwd())
158 158 parentstate = teststate(self, d)
159 159 parentlock = parentstate.makelock()
160 160 parentstate.assertacquirecalled(True)
161 161
162 162 # set up lock inheritance
163 163 with parentlock.inherit() as lockname:
164 164 parentstate.assertreleasecalled(True)
165 165 parentstate.assertpostreleasecalled(False)
166 166 parentstate.assertlockexists(True)
167 167
168 168 childstate = teststate(self, d, pidoffset=1)
169 169 childlock = childstate.makelock(parentlock=lockname)
170 170 childstate.assertacquirecalled(True)
171 171
172 172 childlock.release()
173 173 childstate.assertreleasecalled(True)
174 174 childstate.assertpostreleasecalled(False)
175 175 childstate.assertlockexists(True)
176 176
177 177 parentstate.resetacquirefn()
178 178
179 179 parentstate.assertacquirecalled(True)
180 180
181 181 parentlock.release()
182 182 parentstate.assertreleasecalled(True)
183 183 parentstate.assertpostreleasecalled(True)
184 184 parentstate.assertlockexists(False)
185 185
186 186 def testmultilock(self):
187 187 d = tempfile.mkdtemp(dir=os.getcwd())
188 188 state0 = teststate(self, d)
189 189 lock0 = state0.makelock()
190 190 state0.assertacquirecalled(True)
191 191
192 192 with lock0.inherit() as lock0name:
193 193 state0.assertreleasecalled(True)
194 194 state0.assertpostreleasecalled(False)
195 195 state0.assertlockexists(True)
196 196
197 197 state1 = teststate(self, d, pidoffset=1)
198 198 lock1 = state1.makelock(parentlock=lock0name)
199 199 state1.assertacquirecalled(True)
200 200
201 201 # from within lock1, acquire another lock
202 202 with lock1.inherit() as lock1name:
203 203 # since the file on disk is lock0's this should have the same
204 204 # name
205 205 self.assertEqual(lock0name, lock1name)
206 206
207 207 state2 = teststate(self, d, pidoffset=2)
208 208 lock2 = state2.makelock(parentlock=lock1name)
209 209 state2.assertacquirecalled(True)
210 210
211 211 lock2.release()
212 212 state2.assertreleasecalled(True)
213 213 state2.assertpostreleasecalled(False)
214 214 state2.assertlockexists(True)
215 215
216 216 state1.resetacquirefn()
217 217
218 218 state1.assertacquirecalled(True)
219 219
220 220 lock1.release()
221 221 state1.assertreleasecalled(True)
222 222 state1.assertpostreleasecalled(False)
223 223 state1.assertlockexists(True)
224 224
225 225 lock0.release()
226 226
227 227 def testinheritlockfork(self):
228 228 d = tempfile.mkdtemp(dir=os.getcwd())
229 229 parentstate = teststate(self, d)
230 230 parentlock = parentstate.makelock()
231 231 parentstate.assertacquirecalled(True)
232 232
233 233 # set up lock inheritance
234 234 with parentlock.inherit() as lockname:
235 235 childstate = teststate(self, d, pidoffset=1)
236 236 childlock = childstate.makelock(parentlock=lockname)
237 237 childstate.assertacquirecalled(True)
238 238
239 239 # fork the child lock
240 240 forkchildlock = copy.deepcopy(childlock)
241 241 forkchildlock._pidoffset += 1
242 242 forkchildlock.release()
243 243 childstate.assertreleasecalled(False)
244 244 childstate.assertpostreleasecalled(False)
245 245 childstate.assertlockexists(True)
246 246
247 247 # release the child lock
248 248 childlock.release()
249 249 childstate.assertreleasecalled(True)
250 250 childstate.assertpostreleasecalled(False)
251 251 childstate.assertlockexists(True)
252 252
253 253 parentlock.release()
254 254
255 255 def testinheritcheck(self):
256 256 d = tempfile.mkdtemp(dir=os.getcwd())
257 257 state = teststate(self, d)
258 258 def check():
259 259 raise error.LockInheritanceContractViolation('check failed')
260 260 lock = state.makelock(inheritchecker=check)
261 261 state.assertacquirecalled(True)
262 262
263 def tryinherit():
263 with self.assertRaises(error.LockInheritanceContractViolation):
264 264 with lock.inherit():
265 265 pass
266 266
267 self.assertRaises(error.LockInheritanceContractViolation, tryinherit)
268
269 267 lock.release()
270 268
271 269 def testfrequentlockunlock(self):
272 270 """This tests whether lock acquisition fails as expected, even if
273 271 (1) lock can't be acquired (makelock fails by EEXIST), and
274 272 (2) locker info can't be read in (readlock fails by ENOENT) while
275 273 retrying 5 times.
276 274 """
277 275
278 276 d = tempfile.mkdtemp(dir=os.getcwd())
279 277 state = teststate(self, d)
280 278
281 279 def emulatefrequentlock(*args):
282 280 raise OSError(errno.EEXIST, "File exists")
283 281 def emulatefrequentunlock(*args):
284 282 raise OSError(errno.ENOENT, "No such file or directory")
285 283
286 284 state.vfs.makelock = emulatefrequentlock
287 285 state.vfs.readlock = emulatefrequentunlock
288 286
289 287 try:
290 288 state.makelock(timeout=0)
291 289 self.fail("unexpected lock acquisition")
292 290 except error.LockHeld as why:
293 291 self.assertTrue(why.errno == errno.ETIMEDOUT)
294 292 self.assertTrue(why.locker == "")
295 293 state.assertlockexists(False)
296 294
297 295 if __name__ == '__main__':
298 296 silenttestrunner.main(__name__)
@@ -1,487 +1,491 b''
1 1 from __future__ import absolute_import
2 2
3 3 import binascii
4 4 import itertools
5 5 import silenttestrunner
6 6 import unittest
7 7
8 8 from mercurial import (
9 9 manifest as manifestmod,
10 10 match as matchmod,
11 11 )
12 12
13 13 EMTPY_MANIFEST = ''
14 14 EMTPY_MANIFEST_V2 = '\0\n'
15 15
16 16 HASH_1 = '1' * 40
17 17 BIN_HASH_1 = binascii.unhexlify(HASH_1)
18 18 HASH_2 = 'f' * 40
19 19 BIN_HASH_2 = binascii.unhexlify(HASH_2)
20 20 HASH_3 = '1234567890abcdef0987654321deadbeef0fcafe'
21 21 BIN_HASH_3 = binascii.unhexlify(HASH_3)
22 22 A_SHORT_MANIFEST = (
23 23 'bar/baz/qux.py\0%(hash2)s%(flag2)s\n'
24 24 'foo\0%(hash1)s%(flag1)s\n'
25 25 ) % {'hash1': HASH_1,
26 26 'flag1': '',
27 27 'hash2': HASH_2,
28 28 'flag2': 'l',
29 29 }
30 30
31 31 # Same data as A_SHORT_MANIFEST
32 32 A_SHORT_MANIFEST_V2 = (
33 33 '\0\n'
34 34 '\x00bar/baz/qux.py\0%(flag2)s\n%(hash2)s\n'
35 35 '\x00foo\0%(flag1)s\n%(hash1)s\n'
36 36 ) % {'hash1': BIN_HASH_1,
37 37 'flag1': '',
38 38 'hash2': BIN_HASH_2,
39 39 'flag2': 'l',
40 40 }
41 41
42 42 # Same data as A_SHORT_MANIFEST
43 43 A_METADATA_MANIFEST = (
44 44 '\0foo\0bar\n'
45 45 '\x00bar/baz/qux.py\0%(flag2)s\0foo\0bar\n%(hash2)s\n' # flag and metadata
46 46 '\x00foo\0%(flag1)s\0foo\n%(hash1)s\n' # no flag, but metadata
47 47 ) % {'hash1': BIN_HASH_1,
48 48 'flag1': '',
49 49 'hash2': BIN_HASH_2,
50 50 'flag2': 'l',
51 51 }
52 52
53 53 A_STEM_COMPRESSED_MANIFEST = (
54 54 '\0\n'
55 55 '\x00bar/baz/qux.py\0%(flag2)s\n%(hash2)s\n'
56 56 '\x04qux/foo.py\0%(flag1)s\n%(hash1)s\n' # simple case of 4 stem chars
57 57 '\x0az.py\0%(flag1)s\n%(hash1)s\n' # tricky newline = 10 stem characters
58 58 '\x00%(verylongdir)sx/x\0\n%(hash1)s\n'
59 59 '\xffx/y\0\n%(hash2)s\n' # more than 255 stem chars
60 60 ) % {'hash1': BIN_HASH_1,
61 61 'flag1': '',
62 62 'hash2': BIN_HASH_2,
63 63 'flag2': 'l',
64 64 'verylongdir': 255 * 'x',
65 65 }
66 66
67 67 A_DEEPER_MANIFEST = (
68 68 'a/b/c/bar.py\0%(hash3)s%(flag1)s\n'
69 69 'a/b/c/bar.txt\0%(hash1)s%(flag1)s\n'
70 70 'a/b/c/foo.py\0%(hash3)s%(flag1)s\n'
71 71 'a/b/c/foo.txt\0%(hash2)s%(flag2)s\n'
72 72 'a/b/d/baz.py\0%(hash3)s%(flag1)s\n'
73 73 'a/b/d/qux.py\0%(hash1)s%(flag2)s\n'
74 74 'a/b/d/ten.txt\0%(hash3)s%(flag2)s\n'
75 75 'a/b/dog.py\0%(hash3)s%(flag1)s\n'
76 76 'a/b/fish.py\0%(hash2)s%(flag1)s\n'
77 77 'a/c/london.py\0%(hash3)s%(flag2)s\n'
78 78 'a/c/paper.txt\0%(hash2)s%(flag2)s\n'
79 79 'a/c/paris.py\0%(hash2)s%(flag1)s\n'
80 80 'a/d/apple.py\0%(hash3)s%(flag1)s\n'
81 81 'a/d/pizza.py\0%(hash3)s%(flag2)s\n'
82 82 'a/green.py\0%(hash1)s%(flag2)s\n'
83 83 'a/purple.py\0%(hash2)s%(flag1)s\n'
84 84 'app.py\0%(hash3)s%(flag1)s\n'
85 85 'readme.txt\0%(hash2)s%(flag1)s\n'
86 86 ) % {'hash1': HASH_1,
87 87 'flag1': '',
88 88 'hash2': HASH_2,
89 89 'flag2': 'l',
90 90 'hash3': HASH_3,
91 91 }
92 92
93 93 HUGE_MANIFEST_ENTRIES = 200001
94 94
95 95 A_HUGE_MANIFEST = ''.join(sorted(
96 96 'file%d\0%s%s\n' % (i, h, f) for i, h, f in
97 97 itertools.izip(xrange(200001),
98 98 itertools.cycle((HASH_1, HASH_2)),
99 99 itertools.cycle(('', 'x', 'l')))))
100 100
101 101 class basemanifesttests(object):
102 102 def parsemanifest(self, text):
103 103 raise NotImplementedError('parsemanifest not implemented by test case')
104 104
105 105 def assertIn(self, thing, container, msg=None):
106 106 # assertIn new in 2.7, use it if available, otherwise polyfill
107 107 sup = getattr(unittest.TestCase, 'assertIn', False)
108 108 if sup:
109 109 return sup(self, thing, container, msg=msg)
110 110 if not msg:
111 111 msg = 'Expected %r in %r' % (thing, container)
112 112 self.assert_(thing in container, msg)
113 113
114 114 def testEmptyManifest(self):
115 115 m = self.parsemanifest(EMTPY_MANIFEST)
116 116 self.assertEqual(0, len(m))
117 117 self.assertEqual([], list(m))
118 118
119 119 def testEmptyManifestv2(self):
120 120 m = self.parsemanifest(EMTPY_MANIFEST_V2)
121 121 self.assertEqual(0, len(m))
122 122 self.assertEqual([], list(m))
123 123
124 124 def testManifest(self):
125 125 m = self.parsemanifest(A_SHORT_MANIFEST)
126 126 self.assertEqual(['bar/baz/qux.py', 'foo'], list(m))
127 127 self.assertEqual(BIN_HASH_2, m['bar/baz/qux.py'])
128 128 self.assertEqual('l', m.flags('bar/baz/qux.py'))
129 129 self.assertEqual(BIN_HASH_1, m['foo'])
130 130 self.assertEqual('', m.flags('foo'))
131 self.assertRaises(KeyError, lambda : m['wat'])
131 with self.assertRaises(KeyError):
132 m['wat']
132 133
133 134 def testParseManifestV2(self):
134 135 m1 = self.parsemanifest(A_SHORT_MANIFEST)
135 136 m2 = self.parsemanifest(A_SHORT_MANIFEST_V2)
136 137 # Should have same content as A_SHORT_MANIFEST
137 138 self.assertEqual(m1.text(), m2.text())
138 139
139 140 def testParseManifestMetadata(self):
140 141 # Metadata is for future-proofing and should be accepted but ignored
141 142 m = self.parsemanifest(A_METADATA_MANIFEST)
142 143 self.assertEqual(A_SHORT_MANIFEST, m.text())
143 144
144 145 def testParseManifestStemCompression(self):
145 146 m = self.parsemanifest(A_STEM_COMPRESSED_MANIFEST)
146 147 self.assertIn('bar/baz/qux.py', m)
147 148 self.assertIn('bar/qux/foo.py', m)
148 149 self.assertIn('bar/qux/foz.py', m)
149 150 self.assertIn(256 * 'x' + '/x', m)
150 151 self.assertIn(256 * 'x' + '/y', m)
151 152 self.assertEqual(A_STEM_COMPRESSED_MANIFEST, m.text(usemanifestv2=True))
152 153
153 154 def testTextV2(self):
154 155 m1 = self.parsemanifest(A_SHORT_MANIFEST)
155 156 v2text = m1.text(usemanifestv2=True)
156 157 self.assertEqual(A_SHORT_MANIFEST_V2, v2text)
157 158
158 159 def testSetItem(self):
159 160 want = BIN_HASH_1
160 161
161 162 m = self.parsemanifest(EMTPY_MANIFEST)
162 163 m['a'] = want
163 164 self.assertIn('a', m)
164 165 self.assertEqual(want, m['a'])
165 166 self.assertEqual('a\0' + HASH_1 + '\n', m.text())
166 167
167 168 m = self.parsemanifest(A_SHORT_MANIFEST)
168 169 m['a'] = want
169 170 self.assertEqual(want, m['a'])
170 171 self.assertEqual('a\0' + HASH_1 + '\n' + A_SHORT_MANIFEST,
171 172 m.text())
172 173
173 174 def testSetFlag(self):
174 175 want = 'x'
175 176
176 177 m = self.parsemanifest(EMTPY_MANIFEST)
177 178 # first add a file; a file-less flag makes no sense
178 179 m['a'] = BIN_HASH_1
179 180 m.setflag('a', want)
180 181 self.assertEqual(want, m.flags('a'))
181 182 self.assertEqual('a\0' + HASH_1 + want + '\n', m.text())
182 183
183 184 m = self.parsemanifest(A_SHORT_MANIFEST)
184 185 # first add a file; a file-less flag makes no sense
185 186 m['a'] = BIN_HASH_1
186 187 m.setflag('a', want)
187 188 self.assertEqual(want, m.flags('a'))
188 189 self.assertEqual('a\0' + HASH_1 + want + '\n' + A_SHORT_MANIFEST,
189 190 m.text())
190 191
191 192 def testCopy(self):
192 193 m = self.parsemanifest(A_SHORT_MANIFEST)
193 194 m['a'] = BIN_HASH_1
194 195 m2 = m.copy()
195 196 del m
196 197 del m2 # make sure we don't double free() anything
197 198
198 199 def testCompaction(self):
199 200 unhex = binascii.unhexlify
200 201 h1, h2 = unhex(HASH_1), unhex(HASH_2)
201 202 m = self.parsemanifest(A_SHORT_MANIFEST)
202 203 m['alpha'] = h1
203 204 m['beta'] = h2
204 205 del m['foo']
205 206 want = 'alpha\0%s\nbar/baz/qux.py\0%sl\nbeta\0%s\n' % (
206 207 HASH_1, HASH_2, HASH_2)
207 208 self.assertEqual(want, m.text())
208 209 self.assertEqual(3, len(m))
209 210 self.assertEqual(['alpha', 'bar/baz/qux.py', 'beta'], list(m))
210 211 self.assertEqual(h1, m['alpha'])
211 212 self.assertEqual(h2, m['bar/baz/qux.py'])
212 213 self.assertEqual(h2, m['beta'])
213 214 self.assertEqual('', m.flags('alpha'))
214 215 self.assertEqual('l', m.flags('bar/baz/qux.py'))
215 216 self.assertEqual('', m.flags('beta'))
216 self.assertRaises(KeyError, lambda : m['foo'])
217 with self.assertRaises(KeyError):
218 m['foo']
217 219
218 220 def testSetGetNodeSuffix(self):
219 221 clean = self.parsemanifest(A_SHORT_MANIFEST)
220 222 m = self.parsemanifest(A_SHORT_MANIFEST)
221 223 h = m['foo']
222 224 f = m.flags('foo')
223 225 want = h + 'a'
224 226 # Merge code wants to set 21-byte fake hashes at times
225 227 m['foo'] = want
226 228 self.assertEqual(want, m['foo'])
227 229 self.assertEqual([('bar/baz/qux.py', BIN_HASH_2),
228 230 ('foo', BIN_HASH_1 + 'a')],
229 231 list(m.iteritems()))
230 232 # Sometimes it even tries a 22-byte fake hash, but we can
231 233 # return 21 and it'll work out
232 234 m['foo'] = want + '+'
233 235 self.assertEqual(want, m['foo'])
234 236 # make sure the suffix survives a copy
235 237 match = matchmod.match('', '', ['re:foo'])
236 238 m2 = m.matches(match)
237 239 self.assertEqual(want, m2['foo'])
238 240 self.assertEqual(1, len(m2))
239 241 m2 = m.copy()
240 242 self.assertEqual(want, m2['foo'])
241 243 # suffix with iteration
242 244 self.assertEqual([('bar/baz/qux.py', BIN_HASH_2),
243 245 ('foo', want)],
244 246 list(m.iteritems()))
245 247
246 248 # shows up in diff
247 249 self.assertEqual({'foo': ((want, f), (h, ''))}, m.diff(clean))
248 250 self.assertEqual({'foo': ((h, ''), (want, f))}, clean.diff(m))
249 251
250 252 def testMatchException(self):
251 253 m = self.parsemanifest(A_SHORT_MANIFEST)
252 254 match = matchmod.match('', '', ['re:.*'])
253 255 def filt(path):
254 256 if path == 'foo':
255 257 assert False
256 258 return True
257 259 match.matchfn = filt
258 self.assertRaises(AssertionError, m.matches, match)
260 with self.assertRaises(AssertionError):
261 m.matches(match)
259 262
260 263 def testRemoveItem(self):
261 264 m = self.parsemanifest(A_SHORT_MANIFEST)
262 265 del m['foo']
263 self.assertRaises(KeyError, lambda : m['foo'])
266 with self.assertRaises(KeyError):
267 m['foo']
264 268 self.assertEqual(1, len(m))
265 269 self.assertEqual(1, len(list(m)))
266 270 # now restore and make sure everything works right
267 271 m['foo'] = 'a' * 20
268 272 self.assertEqual(2, len(m))
269 273 self.assertEqual(2, len(list(m)))
270 274
271 275 def testManifestDiff(self):
272 276 MISSING = (None, '')
273 277 addl = 'z-only-in-left\0' + HASH_1 + '\n'
274 278 addr = 'z-only-in-right\0' + HASH_2 + 'x\n'
275 279 left = self.parsemanifest(
276 280 A_SHORT_MANIFEST.replace(HASH_1, HASH_3 + 'x') + addl)
277 281 right = self.parsemanifest(A_SHORT_MANIFEST + addr)
278 282 want = {
279 283 'foo': ((BIN_HASH_3, 'x'),
280 284 (BIN_HASH_1, '')),
281 285 'z-only-in-left': ((BIN_HASH_1, ''), MISSING),
282 286 'z-only-in-right': (MISSING, (BIN_HASH_2, 'x')),
283 287 }
284 288 self.assertEqual(want, left.diff(right))
285 289
286 290 want = {
287 291 'bar/baz/qux.py': (MISSING, (BIN_HASH_2, 'l')),
288 292 'foo': (MISSING, (BIN_HASH_3, 'x')),
289 293 'z-only-in-left': (MISSING, (BIN_HASH_1, '')),
290 294 }
291 295 self.assertEqual(want, self.parsemanifest(EMTPY_MANIFEST).diff(left))
292 296
293 297 want = {
294 298 'bar/baz/qux.py': ((BIN_HASH_2, 'l'), MISSING),
295 299 'foo': ((BIN_HASH_3, 'x'), MISSING),
296 300 'z-only-in-left': ((BIN_HASH_1, ''), MISSING),
297 301 }
298 302 self.assertEqual(want, left.diff(self.parsemanifest(EMTPY_MANIFEST)))
299 303 copy = right.copy()
300 304 del copy['z-only-in-right']
301 305 del right['foo']
302 306 want = {
303 307 'foo': (MISSING, (BIN_HASH_1, '')),
304 308 'z-only-in-right': ((BIN_HASH_2, 'x'), MISSING),
305 309 }
306 310 self.assertEqual(want, right.diff(copy))
307 311
308 312 short = self.parsemanifest(A_SHORT_MANIFEST)
309 313 pruned = short.copy()
310 314 del pruned['foo']
311 315 want = {
312 316 'foo': ((BIN_HASH_1, ''), MISSING),
313 317 }
314 318 self.assertEqual(want, short.diff(pruned))
315 319 want = {
316 320 'foo': (MISSING, (BIN_HASH_1, '')),
317 321 }
318 322 self.assertEqual(want, pruned.diff(short))
319 323 want = {
320 324 'bar/baz/qux.py': None,
321 325 'foo': (MISSING, (BIN_HASH_1, '')),
322 326 }
323 327 self.assertEqual(want, pruned.diff(short, clean=True))
324 328
325 329 def testReversedLines(self):
326 330 backwards = ''.join(
327 331 l + '\n' for l in reversed(A_SHORT_MANIFEST.split('\n')) if l)
328 332 try:
329 333 self.parsemanifest(backwards)
330 334 self.fail('Should have raised ValueError')
331 335 except ValueError as v:
332 336 self.assertIn('Manifest lines not in sorted order.', str(v))
333 337
334 338 def testNoTerminalNewline(self):
335 339 try:
336 340 self.parsemanifest(A_SHORT_MANIFEST + 'wat')
337 341 self.fail('Should have raised ValueError')
338 342 except ValueError as v:
339 343 self.assertIn('Manifest did not end in a newline.', str(v))
340 344
341 345 def testNoNewLineAtAll(self):
342 346 try:
343 347 self.parsemanifest('wat')
344 348 self.fail('Should have raised ValueError')
345 349 except ValueError as v:
346 350 self.assertIn('Manifest did not end in a newline.', str(v))
347 351
348 352 def testHugeManifest(self):
349 353 m = self.parsemanifest(A_HUGE_MANIFEST)
350 354 self.assertEqual(HUGE_MANIFEST_ENTRIES, len(m))
351 355 self.assertEqual(len(m), len(list(m)))
352 356
353 357 def testMatchesMetadata(self):
354 358 '''Tests matches() for a few specific files to make sure that both
355 359 the set of files as well as their flags and nodeids are correct in
356 360 the resulting manifest.'''
357 361 m = self.parsemanifest(A_HUGE_MANIFEST)
358 362
359 363 match = matchmod.match('/', '',
360 364 ['file1', 'file200', 'file300'], exact=True)
361 365 m2 = m.matches(match)
362 366
363 367 w = ('file1\0%sx\n'
364 368 'file200\0%sl\n'
365 369 'file300\0%s\n') % (HASH_2, HASH_1, HASH_1)
366 370 self.assertEqual(w, m2.text())
367 371
368 372 def testMatchesNonexistentFile(self):
369 373 '''Tests matches() for a small set of specific files, including one
370 374 nonexistent file to make sure in only matches against existing files.
371 375 '''
372 376 m = self.parsemanifest(A_DEEPER_MANIFEST)
373 377
374 378 match = matchmod.match('/', '',
375 379 ['a/b/c/bar.txt', 'a/b/d/qux.py', 'readme.txt', 'nonexistent'],
376 380 exact=True)
377 381 m2 = m.matches(match)
378 382
379 383 self.assertEqual(
380 384 ['a/b/c/bar.txt', 'a/b/d/qux.py', 'readme.txt'],
381 385 m2.keys())
382 386
383 387 def testMatchesNonexistentDirectory(self):
384 388 '''Tests matches() for a relpath match on a directory that doesn't
385 389 actually exist.'''
386 390 m = self.parsemanifest(A_DEEPER_MANIFEST)
387 391
388 392 match = matchmod.match('/', '', ['a/f'], default='relpath')
389 393 m2 = m.matches(match)
390 394
391 395 self.assertEqual([], m2.keys())
392 396
393 397 def testMatchesExactLarge(self):
394 398 '''Tests matches() for files matching a large list of exact files.
395 399 '''
396 400 m = self.parsemanifest(A_HUGE_MANIFEST)
397 401
398 402 flist = m.keys()[80:300]
399 403 match = matchmod.match('/', '', flist, exact=True)
400 404 m2 = m.matches(match)
401 405
402 406 self.assertEqual(flist, m2.keys())
403 407
404 408 def testMatchesFull(self):
405 409 '''Tests matches() for what should be a full match.'''
406 410 m = self.parsemanifest(A_DEEPER_MANIFEST)
407 411
408 412 match = matchmod.match('/', '', [''])
409 413 m2 = m.matches(match)
410 414
411 415 self.assertEqual(m.keys(), m2.keys())
412 416
413 417 def testMatchesDirectory(self):
414 418 '''Tests matches() on a relpath match on a directory, which should
415 419 match against all files within said directory.'''
416 420 m = self.parsemanifest(A_DEEPER_MANIFEST)
417 421
418 422 match = matchmod.match('/', '', ['a/b'], default='relpath')
419 423 m2 = m.matches(match)
420 424
421 425 self.assertEqual([
422 426 'a/b/c/bar.py', 'a/b/c/bar.txt', 'a/b/c/foo.py', 'a/b/c/foo.txt',
423 427 'a/b/d/baz.py', 'a/b/d/qux.py', 'a/b/d/ten.txt', 'a/b/dog.py',
424 428 'a/b/fish.py'], m2.keys())
425 429
426 430 def testMatchesExactPath(self):
427 431 '''Tests matches() on an exact match on a directory, which should
428 432 result in an empty manifest because you can't perform an exact match
429 433 against a directory.'''
430 434 m = self.parsemanifest(A_DEEPER_MANIFEST)
431 435
432 436 match = matchmod.match('/', '', ['a/b'], exact=True)
433 437 m2 = m.matches(match)
434 438
435 439 self.assertEqual([], m2.keys())
436 440
437 441 def testMatchesCwd(self):
438 442 '''Tests matches() on a relpath match with the current directory ('.')
439 443 when not in the root directory.'''
440 444 m = self.parsemanifest(A_DEEPER_MANIFEST)
441 445
442 446 match = matchmod.match('/', 'a/b', ['.'], default='relpath')
443 447 m2 = m.matches(match)
444 448
445 449 self.assertEqual([
446 450 'a/b/c/bar.py', 'a/b/c/bar.txt', 'a/b/c/foo.py', 'a/b/c/foo.txt',
447 451 'a/b/d/baz.py', 'a/b/d/qux.py', 'a/b/d/ten.txt', 'a/b/dog.py',
448 452 'a/b/fish.py'], m2.keys())
449 453
450 454 def testMatchesWithPattern(self):
451 455 '''Tests matches() for files matching a pattern that reside
452 456 deeper than the specified directory.'''
453 457 m = self.parsemanifest(A_DEEPER_MANIFEST)
454 458
455 459 match = matchmod.match('/', '', ['a/b/*/*.txt'])
456 460 m2 = m.matches(match)
457 461
458 462 self.assertEqual(
459 463 ['a/b/c/bar.txt', 'a/b/c/foo.txt', 'a/b/d/ten.txt'],
460 464 m2.keys())
461 465
462 466 class testmanifestdict(unittest.TestCase, basemanifesttests):
463 467 def parsemanifest(self, text):
464 468 return manifestmod.manifestdict(text)
465 469
466 470 class testtreemanifest(unittest.TestCase, basemanifesttests):
467 471 def parsemanifest(self, text):
468 472 return manifestmod.treemanifest('', text)
469 473
470 474 def testWalkSubtrees(self):
471 475 m = self.parsemanifest(A_DEEPER_MANIFEST)
472 476
473 477 dirs = [s._dir for s in m.walksubtrees()]
474 478 self.assertEqual(
475 479 sorted(['', 'a/', 'a/c/', 'a/d/', 'a/b/', 'a/b/c/', 'a/b/d/']),
476 480 sorted(dirs)
477 481 )
478 482
479 483 match = matchmod.match('/', '', ['path:a/b/'])
480 484 dirs = [s._dir for s in m.walksubtrees(matcher=match)]
481 485 self.assertEqual(
482 486 sorted(['a/b/', 'a/b/c/', 'a/b/d/']),
483 487 sorted(dirs)
484 488 )
485 489
486 490 if __name__ == '__main__':
487 491 silenttestrunner.main(__name__)
@@ -1,84 +1,84 b''
1 1 from __future__ import absolute_import
2 2
3 3 import unittest
4 4 import silenttestrunner
5 5
6 6 from mercurial import (
7 7 error,
8 8 scmutil,
9 9 )
10 10
11 11 class mockfile(object):
12 12 def __init__(self, name, fs):
13 13 self.name = name
14 14 self.fs = fs
15 15
16 16 def __enter__(self):
17 17 return self
18 18
19 19 def __exit__(self, *args, **kwargs):
20 20 pass
21 21
22 22 def write(self, text):
23 23 self.fs.contents[self.name] = text
24 24
25 25 def read(self):
26 26 return self.fs.contents[self.name]
27 27
28 28 class mockvfs(object):
29 29 def __init__(self):
30 30 self.contents = {}
31 31
32 32 def read(self, path):
33 33 return mockfile(path, self).read()
34 34
35 35 def readlines(self, path):
36 36 # lines need to contain the trailing '\n' to mock the real readlines
37 37 return [l for l in mockfile(path, self).read().splitlines(True)]
38 38
39 39 def __call__(self, path, mode, atomictemp):
40 40 return mockfile(path, self)
41 41
42 42 class testsimplekeyvaluefile(unittest.TestCase):
43 43 def setUp(self):
44 44 self.vfs = mockvfs()
45 45
46 46 def testbasicwritingiandreading(self):
47 47 dw = {'key1': 'value1', 'Key2': 'value2'}
48 48 scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(dw)
49 49 self.assertEqual(sorted(self.vfs.read('kvfile').split('\n')),
50 50 ['', 'Key2=value2', 'key1=value1'])
51 51 dr = scmutil.simplekeyvaluefile(self.vfs, 'kvfile').read()
52 52 self.assertEqual(dr, dw)
53 53
54 54 def testinvalidkeys(self):
55 55 d = {'0key1': 'value1', 'Key2': 'value2'}
56 self.assertRaises(error.ProgrammingError,
57 scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write,
58 d)
56 with self.assertRaisesRegexp(error.ProgrammingError,
57 'keys must start with a letter.*'):
58 scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)
59
59 60 d = {'key1@': 'value1', 'Key2': 'value2'}
60 self.assertRaises(error.ProgrammingError,
61 scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write,
62 d)
61 with self.assertRaisesRegexp(error.ProgrammingError, 'invalid key.*'):
62 scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)
63 63
64 64 def testinvalidvalues(self):
65 65 d = {'key1': 'value1', 'Key2': 'value2\n'}
66 self.assertRaises(error.ProgrammingError,
67 scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write,
68 d)
66 with self.assertRaisesRegexp(error.ProgrammingError, 'invalid val.*'):
67 scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)
69 68
70 69 def testcorruptedfile(self):
71 70 self.vfs.contents['badfile'] = 'ababagalamaga\n'
72 self.assertRaises(error.CorruptedState,
73 scmutil.simplekeyvaluefile(self.vfs, 'badfile').read)
71 with self.assertRaisesRegexp(error.CorruptedState,
72 'dictionary.*element.*'):
73 scmutil.simplekeyvaluefile(self.vfs, 'badfile').read()
74 74
75 75 def testfirstline(self):
76 76 dw = {'key1': 'value1'}
77 77 scmutil.simplekeyvaluefile(self.vfs, 'fl').write(dw, firstline='1.0')
78 78 self.assertEqual(self.vfs.read('fl'), '1.0\nkey1=value1\n')
79 79 dr = scmutil.simplekeyvaluefile(self.vfs, 'fl')\
80 80 .read(firstlinenonkeyval=True)
81 81 self.assertEqual(dr, {'__firstline': '1.0', 'key1': 'value1'})
82 82
83 83 if __name__ == "__main__":
84 84 silenttestrunner.main(__name__)
@@ -1,360 +1,361 b''
1 1 # Copyright (C) 2004, 2005 Canonical Ltd
2 2 #
3 3 # This program is free software; you can redistribute it and/or modify
4 4 # it under the terms of the GNU General Public License as published by
5 5 # the Free Software Foundation; either version 2 of the License, or
6 6 # (at your option) any later version.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU General Public License
14 14 # along with this program; if not, see <http://www.gnu.org/licenses/>.
15 15
16 16 from __future__ import absolute_import
17 17
18 18 import unittest
19 19 from mercurial import (
20 20 error,
21 21 simplemerge,
22 22 util,
23 23 )
24 24
25 25 TestCase = unittest.TestCase
26 26 # bzr compatible interface, for the tests
27 27 class Merge3(simplemerge.Merge3Text):
28 28 """3-way merge of texts.
29 29
30 30 Given BASE, OTHER, THIS, tries to produce a combined text
31 31 incorporating the changes from both BASE->OTHER and BASE->THIS.
32 32 All three will typically be sequences of lines."""
33 33 def __init__(self, base, a, b):
34 34 basetext = '\n'.join([i.strip('\n') for i in base] + [''])
35 35 atext = '\n'.join([i.strip('\n') for i in a] + [''])
36 36 btext = '\n'.join([i.strip('\n') for i in b] + [''])
37 37 if util.binary(basetext) or util.binary(atext) or util.binary(btext):
38 38 raise error.Abort("don't know how to merge binary files")
39 39 simplemerge.Merge3Text.__init__(self, basetext, atext, btext,
40 40 base, a, b)
41 41
42 42 CantReprocessAndShowBase = simplemerge.CantReprocessAndShowBase
43 43
44 44 def split_lines(t):
45 45 return util.stringio(t).readlines()
46 46
47 47 ############################################################
48 48 # test case data from the gnu diffutils manual
49 49 # common base
50 50 TZU = split_lines(""" The Nameless is the origin of Heaven and Earth;
51 51 The named is the mother of all things.
52 52
53 53 Therefore let there always be non-being,
54 54 so we may see their subtlety,
55 55 And let there always be being,
56 56 so we may see their outcome.
57 57 The two are the same,
58 58 But after they are produced,
59 59 they have different names.
60 60 They both may be called deep and profound.
61 61 Deeper and more profound,
62 62 The door of all subtleties!
63 63 """)
64 64
65 65 LAO = split_lines(""" The Way that can be told of is not the eternal Way;
66 66 The name that can be named is not the eternal name.
67 67 The Nameless is the origin of Heaven and Earth;
68 68 The Named is the mother of all things.
69 69 Therefore let there always be non-being,
70 70 so we may see their subtlety,
71 71 And let there always be being,
72 72 so we may see their outcome.
73 73 The two are the same,
74 74 But after they are produced,
75 75 they have different names.
76 76 """)
77 77
78 78
79 79 TAO = split_lines(""" The Way that can be told of is not the eternal Way;
80 80 The name that can be named is not the eternal name.
81 81 The Nameless is the origin of Heaven and Earth;
82 82 The named is the mother of all things.
83 83
84 84 Therefore let there always be non-being,
85 85 so we may see their subtlety,
86 86 And let there always be being,
87 87 so we may see their result.
88 88 The two are the same,
89 89 But after they are produced,
90 90 they have different names.
91 91
92 92 -- The Way of Lao-Tzu, tr. Wing-tsit Chan
93 93
94 94 """)
95 95
96 96 MERGED_RESULT = split_lines("""\
97 97 The Way that can be told of is not the eternal Way;
98 98 The name that can be named is not the eternal name.
99 99 The Nameless is the origin of Heaven and Earth;
100 100 The Named is the mother of all things.
101 101 Therefore let there always be non-being,
102 102 so we may see their subtlety,
103 103 And let there always be being,
104 104 so we may see their result.
105 105 The two are the same,
106 106 But after they are produced,
107 107 they have different names.
108 108 <<<<<<< LAO
109 109 =======
110 110
111 111 -- The Way of Lao-Tzu, tr. Wing-tsit Chan
112 112
113 113 >>>>>>> TAO
114 114 """)
115 115
116 116 class TestMerge3(TestCase):
117 117 def log(self, msg):
118 118 pass
119 119
120 120 def test_no_changes(self):
121 121 """No conflicts because nothing changed"""
122 122 m3 = Merge3(['aaa', 'bbb'],
123 123 ['aaa', 'bbb'],
124 124 ['aaa', 'bbb'])
125 125
126 126 self.assertEquals(m3.find_unconflicted(),
127 127 [(0, 2)])
128 128
129 129 self.assertEquals(list(m3.find_sync_regions()),
130 130 [(0, 2,
131 131 0, 2,
132 132 0, 2),
133 133 (2, 2, 2, 2, 2, 2)])
134 134
135 135 self.assertEquals(list(m3.merge_regions()),
136 136 [('unchanged', 0, 2)])
137 137
138 138 self.assertEquals(list(m3.merge_groups()),
139 139 [('unchanged', ['aaa', 'bbb'])])
140 140
141 141 def test_front_insert(self):
142 142 m3 = Merge3(['zz'],
143 143 ['aaa', 'bbb', 'zz'],
144 144 ['zz'])
145 145
146 146 # todo: should use a sentinel at end as from get_matching_blocks
147 147 # to match without zz
148 148 self.assertEquals(list(m3.find_sync_regions()),
149 149 [(0, 1, 2, 3, 0, 1),
150 150 (1, 1, 3, 3, 1, 1)])
151 151
152 152 self.assertEquals(list(m3.merge_regions()),
153 153 [('a', 0, 2),
154 154 ('unchanged', 0, 1)])
155 155
156 156 self.assertEquals(list(m3.merge_groups()),
157 157 [('a', ['aaa', 'bbb']),
158 158 ('unchanged', ['zz'])])
159 159
160 160 def test_null_insert(self):
161 161 m3 = Merge3([],
162 162 ['aaa', 'bbb'],
163 163 [])
164 164 # todo: should use a sentinel at end as from get_matching_blocks
165 165 # to match without zz
166 166 self.assertEquals(list(m3.find_sync_regions()),
167 167 [(0, 0, 2, 2, 0, 0)])
168 168
169 169 self.assertEquals(list(m3.merge_regions()),
170 170 [('a', 0, 2)])
171 171
172 172 self.assertEquals(list(m3.merge_lines()),
173 173 ['aaa', 'bbb'])
174 174
175 175 def test_no_conflicts(self):
176 176 """No conflicts because only one side changed"""
177 177 m3 = Merge3(['aaa', 'bbb'],
178 178 ['aaa', '111', 'bbb'],
179 179 ['aaa', 'bbb'])
180 180
181 181 self.assertEquals(m3.find_unconflicted(),
182 182 [(0, 1), (1, 2)])
183 183
184 184 self.assertEquals(list(m3.find_sync_regions()),
185 185 [(0, 1, 0, 1, 0, 1),
186 186 (1, 2, 2, 3, 1, 2),
187 187 (2, 2, 3, 3, 2, 2)])
188 188
189 189 self.assertEquals(list(m3.merge_regions()),
190 190 [('unchanged', 0, 1),
191 191 ('a', 1, 2),
192 192 ('unchanged', 1, 2)])
193 193
194 194 def test_append_a(self):
195 195 m3 = Merge3(['aaa\n', 'bbb\n'],
196 196 ['aaa\n', 'bbb\n', '222\n'],
197 197 ['aaa\n', 'bbb\n'])
198 198
199 199 self.assertEquals(''.join(m3.merge_lines()),
200 200 'aaa\nbbb\n222\n')
201 201
202 202 def test_append_b(self):
203 203 m3 = Merge3(['aaa\n', 'bbb\n'],
204 204 ['aaa\n', 'bbb\n'],
205 205 ['aaa\n', 'bbb\n', '222\n'])
206 206
207 207 self.assertEquals(''.join(m3.merge_lines()),
208 208 'aaa\nbbb\n222\n')
209 209
210 210 def test_append_agreement(self):
211 211 m3 = Merge3(['aaa\n', 'bbb\n'],
212 212 ['aaa\n', 'bbb\n', '222\n'],
213 213 ['aaa\n', 'bbb\n', '222\n'])
214 214
215 215 self.assertEquals(''.join(m3.merge_lines()),
216 216 'aaa\nbbb\n222\n')
217 217
218 218 def test_append_clash(self):
219 219 m3 = Merge3(['aaa\n', 'bbb\n'],
220 220 ['aaa\n', 'bbb\n', '222\n'],
221 221 ['aaa\n', 'bbb\n', '333\n'])
222 222
223 223 ml = m3.merge_lines(name_a='a',
224 224 name_b='b',
225 225 start_marker='<<',
226 226 mid_marker='--',
227 227 end_marker='>>')
228 228 self.assertEquals(''.join(ml),
229 229 'aaa\n'
230 230 'bbb\n'
231 231 '<< a\n'
232 232 '222\n'
233 233 '--\n'
234 234 '333\n'
235 235 '>> b\n'
236 236 )
237 237
238 238 def test_insert_agreement(self):
239 239 m3 = Merge3(['aaa\n', 'bbb\n'],
240 240 ['aaa\n', '222\n', 'bbb\n'],
241 241 ['aaa\n', '222\n', 'bbb\n'])
242 242
243 243 ml = m3.merge_lines(name_a='a',
244 244 name_b='b',
245 245 start_marker='<<',
246 246 mid_marker='--',
247 247 end_marker='>>')
248 248 self.assertEquals(''.join(ml), 'aaa\n222\nbbb\n')
249 249
250 250
251 251 def test_insert_clash(self):
252 252 """Both try to insert lines in the same place."""
253 253 m3 = Merge3(['aaa\n', 'bbb\n'],
254 254 ['aaa\n', '111\n', 'bbb\n'],
255 255 ['aaa\n', '222\n', 'bbb\n'])
256 256
257 257 self.assertEquals(m3.find_unconflicted(),
258 258 [(0, 1), (1, 2)])
259 259
260 260 self.assertEquals(list(m3.find_sync_regions()),
261 261 [(0, 1, 0, 1, 0, 1),
262 262 (1, 2, 2, 3, 2, 3),
263 263 (2, 2, 3, 3, 3, 3)])
264 264
265 265 self.assertEquals(list(m3.merge_regions()),
266 266 [('unchanged', 0, 1),
267 267 ('conflict', 1, 1, 1, 2, 1, 2),
268 268 ('unchanged', 1, 2)])
269 269
270 270 self.assertEquals(list(m3.merge_groups()),
271 271 [('unchanged', ['aaa\n']),
272 272 ('conflict', [], ['111\n'], ['222\n']),
273 273 ('unchanged', ['bbb\n']),
274 274 ])
275 275
276 276 ml = m3.merge_lines(name_a='a',
277 277 name_b='b',
278 278 start_marker='<<',
279 279 mid_marker='--',
280 280 end_marker='>>')
281 281 self.assertEquals(''.join(ml),
282 282 '''aaa
283 283 << a
284 284 111
285 285 --
286 286 222
287 287 >> b
288 288 bbb
289 289 ''')
290 290
291 291 def test_replace_clash(self):
292 292 """Both try to insert lines in the same place."""
293 293 m3 = Merge3(['aaa', '000', 'bbb'],
294 294 ['aaa', '111', 'bbb'],
295 295 ['aaa', '222', 'bbb'])
296 296
297 297 self.assertEquals(m3.find_unconflicted(),
298 298 [(0, 1), (2, 3)])
299 299
300 300 self.assertEquals(list(m3.find_sync_regions()),
301 301 [(0, 1, 0, 1, 0, 1),
302 302 (2, 3, 2, 3, 2, 3),
303 303 (3, 3, 3, 3, 3, 3)])
304 304
305 305 def test_replace_multi(self):
306 306 """Replacement with regions of different size."""
307 307 m3 = Merge3(['aaa', '000', '000', 'bbb'],
308 308 ['aaa', '111', '111', '111', 'bbb'],
309 309 ['aaa', '222', '222', '222', '222', 'bbb'])
310 310
311 311 self.assertEquals(m3.find_unconflicted(),
312 312 [(0, 1), (3, 4)])
313 313
314 314
315 315 self.assertEquals(list(m3.find_sync_regions()),
316 316 [(0, 1, 0, 1, 0, 1),
317 317 (3, 4, 4, 5, 5, 6),
318 318 (4, 4, 5, 5, 6, 6)])
319 319
320 320 def test_merge_poem(self):
321 321 """Test case from diff3 manual"""
322 322 m3 = Merge3(TZU, LAO, TAO)
323 323 ml = list(m3.merge_lines('LAO', 'TAO'))
324 324 self.log('merge result:')
325 325 self.log(''.join(ml))
326 326 self.assertEquals(ml, MERGED_RESULT)
327 327
328 328 def test_binary(self):
329 self.assertRaises(error.Abort, Merge3, ['\x00'], ['a'], ['b'])
329 with self.assertRaises(error.Abort):
330 Merge3(['\x00'], ['a'], ['b'])
330 331
331 332 def test_dos_text(self):
332 333 base_text = 'a\r\n'
333 334 this_text = 'b\r\n'
334 335 other_text = 'c\r\n'
335 336 m3 = Merge3(base_text.splitlines(True), other_text.splitlines(True),
336 337 this_text.splitlines(True))
337 338 m_lines = m3.merge_lines('OTHER', 'THIS')
338 339 self.assertEqual('<<<<<<< OTHER\r\nc\r\n=======\r\nb\r\n'
339 340 '>>>>>>> THIS\r\n'.splitlines(True), list(m_lines))
340 341
341 342 def test_mac_text(self):
342 343 base_text = 'a\r'
343 344 this_text = 'b\r'
344 345 other_text = 'c\r'
345 346 m3 = Merge3(base_text.splitlines(True), other_text.splitlines(True),
346 347 this_text.splitlines(True))
347 348 m_lines = m3.merge_lines('OTHER', 'THIS')
348 349 self.assertEqual('<<<<<<< OTHER\rc\r=======\rb\r'
349 350 '>>>>>>> THIS\r'.splitlines(True), list(m_lines))
350 351
351 352 if __name__ == '__main__':
352 353 # hide the timer
353 354 import time
354 355 orig = time.time
355 356 try:
356 357 time.time = lambda: 0
357 358 unittest.main()
358 359 finally:
359 360 time.time = orig
360 361
General Comments 0
You need to be logged in to leave comments. Login now