##// END OF EJS Templates
tests: fix style issue of importing order in test-lock.py...
FUJIWARA Katsunori -
r40240:5d50c9ff default
parent child Browse files
Show More
@@ -1,296 +1,297 b''
1 1 from __future__ import absolute_import
2 2
3 3 import copy
4 4 import errno
5 import silenttestrunner
6 5 import tempfile
7 6 import types
8 7 import unittest
9 8
9 import silenttestrunner
10
10 11 from mercurial import (
11 12 encoding,
12 13 error,
13 14 lock,
14 15 vfs as vfsmod,
15 16 )
16 17
17 18 testlockname = b'testlock'
18 19
19 20 # work around http://bugs.python.org/issue1515
20 21 if types.MethodType not in copy._deepcopy_dispatch:
21 22 def _deepcopy_method(x, memo):
22 23 return type(x)(x.__func__, copy.deepcopy(x.__self__, memo), x.im_class)
23 24 copy._deepcopy_dispatch[types.MethodType] = _deepcopy_method
24 25
25 26 class lockwrapper(lock.lock):
26 27 def __init__(self, pidoffset, *args, **kwargs):
27 28 # lock.lock.__init__() calls lock(), so the pidoffset assignment needs
28 29 # to be earlier
29 30 self._pidoffset = pidoffset
30 31 super(lockwrapper, self).__init__(*args, **kwargs)
31 32 def _getpid(self):
32 33 return super(lockwrapper, self)._getpid() + self._pidoffset
33 34
34 35 class teststate(object):
35 36 def __init__(self, testcase, dir, pidoffset=0):
36 37 self._testcase = testcase
37 38 self._acquirecalled = False
38 39 self._releasecalled = False
39 40 self._postreleasecalled = False
40 41 self.vfs = vfsmod.vfs(dir, audit=False)
41 42 self._pidoffset = pidoffset
42 43
43 44 def makelock(self, *args, **kwargs):
44 45 l = lockwrapper(self._pidoffset, self.vfs, testlockname,
45 46 releasefn=self.releasefn, acquirefn=self.acquirefn,
46 47 *args, **kwargs)
47 48 l.postrelease.append(self.postreleasefn)
48 49 return l
49 50
50 51 def acquirefn(self):
51 52 self._acquirecalled = True
52 53
53 54 def releasefn(self):
54 55 self._releasecalled = True
55 56
56 57 def postreleasefn(self):
57 58 self._postreleasecalled = True
58 59
59 60 def assertacquirecalled(self, called):
60 61 self._testcase.assertEqual(
61 62 self._acquirecalled, called,
62 63 'expected acquire to be %s but was actually %s' % (
63 64 self._tocalled(called),
64 65 self._tocalled(self._acquirecalled),
65 66 ))
66 67
67 68 def resetacquirefn(self):
68 69 self._acquirecalled = False
69 70
70 71 def assertreleasecalled(self, called):
71 72 self._testcase.assertEqual(
72 73 self._releasecalled, called,
73 74 'expected release to be %s but was actually %s' % (
74 75 self._tocalled(called),
75 76 self._tocalled(self._releasecalled),
76 77 ))
77 78
78 79 def assertpostreleasecalled(self, called):
79 80 self._testcase.assertEqual(
80 81 self._postreleasecalled, called,
81 82 'expected postrelease to be %s but was actually %s' % (
82 83 self._tocalled(called),
83 84 self._tocalled(self._postreleasecalled),
84 85 ))
85 86
86 87 def assertlockexists(self, exists):
87 88 actual = self.vfs.lexists(testlockname)
88 89 self._testcase.assertEqual(
89 90 actual, exists,
90 91 'expected lock to %s but actually did %s' % (
91 92 self._toexists(exists),
92 93 self._toexists(actual),
93 94 ))
94 95
95 96 def _tocalled(self, called):
96 97 if called:
97 98 return 'called'
98 99 else:
99 100 return 'not called'
100 101
101 102 def _toexists(self, exists):
102 103 if exists:
103 104 return 'exist'
104 105 else:
105 106 return 'not exist'
106 107
107 108 class testlock(unittest.TestCase):
108 109 def testlock(self):
109 110 state = teststate(self, tempfile.mkdtemp(dir=encoding.getcwd()))
110 111 lock = state.makelock()
111 112 state.assertacquirecalled(True)
112 113 lock.release()
113 114 state.assertreleasecalled(True)
114 115 state.assertpostreleasecalled(True)
115 116 state.assertlockexists(False)
116 117
117 118 def testrecursivelock(self):
118 119 state = teststate(self, tempfile.mkdtemp(dir=encoding.getcwd()))
119 120 lock = state.makelock()
120 121 state.assertacquirecalled(True)
121 122
122 123 state.resetacquirefn()
123 124 lock.lock()
124 125 # recursive lock should not call acquirefn again
125 126 state.assertacquirecalled(False)
126 127
127 128 lock.release() # brings lock refcount down from 2 to 1
128 129 state.assertreleasecalled(False)
129 130 state.assertpostreleasecalled(False)
130 131 state.assertlockexists(True)
131 132
132 133 lock.release() # releases the lock
133 134 state.assertreleasecalled(True)
134 135 state.assertpostreleasecalled(True)
135 136 state.assertlockexists(False)
136 137
137 138 def testlockfork(self):
138 139 state = teststate(self, tempfile.mkdtemp(dir=encoding.getcwd()))
139 140 lock = state.makelock()
140 141 state.assertacquirecalled(True)
141 142
142 143 # fake a fork
143 144 forklock = copy.deepcopy(lock)
144 145 forklock._pidoffset = 1
145 146 forklock.release()
146 147 state.assertreleasecalled(False)
147 148 state.assertpostreleasecalled(False)
148 149 state.assertlockexists(True)
149 150
150 151 # release the actual lock
151 152 lock.release()
152 153 state.assertreleasecalled(True)
153 154 state.assertpostreleasecalled(True)
154 155 state.assertlockexists(False)
155 156
156 157 def testinheritlock(self):
157 158 d = tempfile.mkdtemp(dir=encoding.getcwd())
158 159 parentstate = teststate(self, d)
159 160 parentlock = parentstate.makelock()
160 161 parentstate.assertacquirecalled(True)
161 162
162 163 # set up lock inheritance
163 164 with parentlock.inherit() as lockname:
164 165 parentstate.assertreleasecalled(True)
165 166 parentstate.assertpostreleasecalled(False)
166 167 parentstate.assertlockexists(True)
167 168
168 169 childstate = teststate(self, d, pidoffset=1)
169 170 childlock = childstate.makelock(parentlock=lockname)
170 171 childstate.assertacquirecalled(True)
171 172
172 173 childlock.release()
173 174 childstate.assertreleasecalled(True)
174 175 childstate.assertpostreleasecalled(False)
175 176 childstate.assertlockexists(True)
176 177
177 178 parentstate.resetacquirefn()
178 179
179 180 parentstate.assertacquirecalled(True)
180 181
181 182 parentlock.release()
182 183 parentstate.assertreleasecalled(True)
183 184 parentstate.assertpostreleasecalled(True)
184 185 parentstate.assertlockexists(False)
185 186
186 187 def testmultilock(self):
187 188 d = tempfile.mkdtemp(dir=encoding.getcwd())
188 189 state0 = teststate(self, d)
189 190 lock0 = state0.makelock()
190 191 state0.assertacquirecalled(True)
191 192
192 193 with lock0.inherit() as lock0name:
193 194 state0.assertreleasecalled(True)
194 195 state0.assertpostreleasecalled(False)
195 196 state0.assertlockexists(True)
196 197
197 198 state1 = teststate(self, d, pidoffset=1)
198 199 lock1 = state1.makelock(parentlock=lock0name)
199 200 state1.assertacquirecalled(True)
200 201
201 202 # from within lock1, acquire another lock
202 203 with lock1.inherit() as lock1name:
203 204 # since the file on disk is lock0's this should have the same
204 205 # name
205 206 self.assertEqual(lock0name, lock1name)
206 207
207 208 state2 = teststate(self, d, pidoffset=2)
208 209 lock2 = state2.makelock(parentlock=lock1name)
209 210 state2.assertacquirecalled(True)
210 211
211 212 lock2.release()
212 213 state2.assertreleasecalled(True)
213 214 state2.assertpostreleasecalled(False)
214 215 state2.assertlockexists(True)
215 216
216 217 state1.resetacquirefn()
217 218
218 219 state1.assertacquirecalled(True)
219 220
220 221 lock1.release()
221 222 state1.assertreleasecalled(True)
222 223 state1.assertpostreleasecalled(False)
223 224 state1.assertlockexists(True)
224 225
225 226 lock0.release()
226 227
227 228 def testinheritlockfork(self):
228 229 d = tempfile.mkdtemp(dir=encoding.getcwd())
229 230 parentstate = teststate(self, d)
230 231 parentlock = parentstate.makelock()
231 232 parentstate.assertacquirecalled(True)
232 233
233 234 # set up lock inheritance
234 235 with parentlock.inherit() as lockname:
235 236 childstate = teststate(self, d, pidoffset=1)
236 237 childlock = childstate.makelock(parentlock=lockname)
237 238 childstate.assertacquirecalled(True)
238 239
239 240 # fork the child lock
240 241 forkchildlock = copy.deepcopy(childlock)
241 242 forkchildlock._pidoffset += 1
242 243 forkchildlock.release()
243 244 childstate.assertreleasecalled(False)
244 245 childstate.assertpostreleasecalled(False)
245 246 childstate.assertlockexists(True)
246 247
247 248 # release the child lock
248 249 childlock.release()
249 250 childstate.assertreleasecalled(True)
250 251 childstate.assertpostreleasecalled(False)
251 252 childstate.assertlockexists(True)
252 253
253 254 parentlock.release()
254 255
255 256 def testinheritcheck(self):
256 257 d = tempfile.mkdtemp(dir=encoding.getcwd())
257 258 state = teststate(self, d)
258 259 def check():
259 260 raise error.LockInheritanceContractViolation('check failed')
260 261 lock = state.makelock(inheritchecker=check)
261 262 state.assertacquirecalled(True)
262 263
263 264 with self.assertRaises(error.LockInheritanceContractViolation):
264 265 with lock.inherit():
265 266 pass
266 267
267 268 lock.release()
268 269
269 270 def testfrequentlockunlock(self):
270 271 """This tests whether lock acquisition fails as expected, even if
271 272 (1) lock can't be acquired (makelock fails by EEXIST), and
272 273 (2) locker info can't be read in (readlock fails by ENOENT) while
273 274 retrying 5 times.
274 275 """
275 276
276 277 d = tempfile.mkdtemp(dir=encoding.getcwd())
277 278 state = teststate(self, d)
278 279
279 280 def emulatefrequentlock(*args):
280 281 raise OSError(errno.EEXIST, "File exists")
281 282 def emulatefrequentunlock(*args):
282 283 raise OSError(errno.ENOENT, "No such file or directory")
283 284
284 285 state.vfs.makelock = emulatefrequentlock
285 286 state.vfs.readlock = emulatefrequentunlock
286 287
287 288 try:
288 289 state.makelock(timeout=0)
289 290 self.fail("unexpected lock acquisition")
290 291 except error.LockHeld as why:
291 292 self.assertTrue(why.errno == errno.ETIMEDOUT)
292 293 self.assertTrue(why.locker == "")
293 294 state.assertlockexists(False)
294 295
295 296 if __name__ == '__main__':
296 297 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now