##// END OF EJS Templates
vfs: use 'vfs' module directly in 'test-lock'...
Pierre-Yves David -
r31249:e067741d default
parent child Browse files
Show More
@@ -1,271 +1,271 b''
1 1 from __future__ import absolute_import
2 2
3 3 import copy
4 4 import os
5 5 import silenttestrunner
6 6 import tempfile
7 7 import types
8 8 import unittest
9 9
10 10 from mercurial import (
11 11 error,
12 12 lock,
13 scmutil,
13 vfs as vfsmod,
14 14 )
15 15
16 16 testlockname = 'testlock'
17 17
18 18 # work around http://bugs.python.org/issue1515
19 19 if types.MethodType not in copy._deepcopy_dispatch:
20 20 def _deepcopy_method(x, memo):
21 21 return type(x)(x.im_func, copy.deepcopy(x.im_self, memo), x.im_class)
22 22 copy._deepcopy_dispatch[types.MethodType] = _deepcopy_method
23 23
24 24 class lockwrapper(lock.lock):
25 25 def __init__(self, pidoffset, *args, **kwargs):
26 26 # lock.lock.__init__() calls lock(), so the pidoffset assignment needs
27 27 # to be earlier
28 28 self._pidoffset = pidoffset
29 29 super(lockwrapper, self).__init__(*args, **kwargs)
30 30 def _getpid(self):
31 31 return super(lockwrapper, self)._getpid() + self._pidoffset
32 32
33 33 class teststate(object):
34 34 def __init__(self, testcase, dir, pidoffset=0):
35 35 self._testcase = testcase
36 36 self._acquirecalled = False
37 37 self._releasecalled = False
38 38 self._postreleasecalled = False
39 self.vfs = scmutil.vfs(dir, audit=False)
39 self.vfs = vfsmod.vfs(dir, audit=False)
40 40 self._pidoffset = pidoffset
41 41
42 42 def makelock(self, *args, **kwargs):
43 43 l = lockwrapper(self._pidoffset, self.vfs, testlockname,
44 44 releasefn=self.releasefn, acquirefn=self.acquirefn,
45 45 *args, **kwargs)
46 46 l.postrelease.append(self.postreleasefn)
47 47 return l
48 48
49 49 def acquirefn(self):
50 50 self._acquirecalled = True
51 51
52 52 def releasefn(self):
53 53 self._releasecalled = True
54 54
55 55 def postreleasefn(self):
56 56 self._postreleasecalled = True
57 57
58 58 def assertacquirecalled(self, called):
59 59 self._testcase.assertEqual(
60 60 self._acquirecalled, called,
61 61 'expected acquire to be %s but was actually %s' % (
62 62 self._tocalled(called),
63 63 self._tocalled(self._acquirecalled),
64 64 ))
65 65
66 66 def resetacquirefn(self):
67 67 self._acquirecalled = False
68 68
69 69 def assertreleasecalled(self, called):
70 70 self._testcase.assertEqual(
71 71 self._releasecalled, called,
72 72 'expected release to be %s but was actually %s' % (
73 73 self._tocalled(called),
74 74 self._tocalled(self._releasecalled),
75 75 ))
76 76
77 77 def assertpostreleasecalled(self, called):
78 78 self._testcase.assertEqual(
79 79 self._postreleasecalled, called,
80 80 'expected postrelease to be %s but was actually %s' % (
81 81 self._tocalled(called),
82 82 self._tocalled(self._postreleasecalled),
83 83 ))
84 84
85 85 def assertlockexists(self, exists):
86 86 actual = self.vfs.lexists(testlockname)
87 87 self._testcase.assertEqual(
88 88 actual, exists,
89 89 'expected lock to %s but actually did %s' % (
90 90 self._toexists(exists),
91 91 self._toexists(actual),
92 92 ))
93 93
94 94 def _tocalled(self, called):
95 95 if called:
96 96 return 'called'
97 97 else:
98 98 return 'not called'
99 99
100 100 def _toexists(self, exists):
101 101 if exists:
102 102 return 'exist'
103 103 else:
104 104 return 'not exist'
105 105
106 106 class testlock(unittest.TestCase):
107 107 def testlock(self):
108 108 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
109 109 lock = state.makelock()
110 110 state.assertacquirecalled(True)
111 111 lock.release()
112 112 state.assertreleasecalled(True)
113 113 state.assertpostreleasecalled(True)
114 114 state.assertlockexists(False)
115 115
116 116 def testrecursivelock(self):
117 117 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
118 118 lock = state.makelock()
119 119 state.assertacquirecalled(True)
120 120
121 121 state.resetacquirefn()
122 122 lock.lock()
123 123 # recursive lock should not call acquirefn again
124 124 state.assertacquirecalled(False)
125 125
126 126 lock.release() # brings lock refcount down from 2 to 1
127 127 state.assertreleasecalled(False)
128 128 state.assertpostreleasecalled(False)
129 129 state.assertlockexists(True)
130 130
131 131 lock.release() # releases the lock
132 132 state.assertreleasecalled(True)
133 133 state.assertpostreleasecalled(True)
134 134 state.assertlockexists(False)
135 135
136 136 def testlockfork(self):
137 137 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
138 138 lock = state.makelock()
139 139 state.assertacquirecalled(True)
140 140
141 141 # fake a fork
142 142 forklock = copy.deepcopy(lock)
143 143 forklock._pidoffset = 1
144 144 forklock.release()
145 145 state.assertreleasecalled(False)
146 146 state.assertpostreleasecalled(False)
147 147 state.assertlockexists(True)
148 148
149 149 # release the actual lock
150 150 lock.release()
151 151 state.assertreleasecalled(True)
152 152 state.assertpostreleasecalled(True)
153 153 state.assertlockexists(False)
154 154
155 155 def testinheritlock(self):
156 156 d = tempfile.mkdtemp(dir=os.getcwd())
157 157 parentstate = teststate(self, d)
158 158 parentlock = parentstate.makelock()
159 159 parentstate.assertacquirecalled(True)
160 160
161 161 # set up lock inheritance
162 162 with parentlock.inherit() as lockname:
163 163 parentstate.assertreleasecalled(True)
164 164 parentstate.assertpostreleasecalled(False)
165 165 parentstate.assertlockexists(True)
166 166
167 167 childstate = teststate(self, d, pidoffset=1)
168 168 childlock = childstate.makelock(parentlock=lockname)
169 169 childstate.assertacquirecalled(True)
170 170
171 171 childlock.release()
172 172 childstate.assertreleasecalled(True)
173 173 childstate.assertpostreleasecalled(False)
174 174 childstate.assertlockexists(True)
175 175
176 176 parentstate.resetacquirefn()
177 177
178 178 parentstate.assertacquirecalled(True)
179 179
180 180 parentlock.release()
181 181 parentstate.assertreleasecalled(True)
182 182 parentstate.assertpostreleasecalled(True)
183 183 parentstate.assertlockexists(False)
184 184
185 185 def testmultilock(self):
186 186 d = tempfile.mkdtemp(dir=os.getcwd())
187 187 state0 = teststate(self, d)
188 188 lock0 = state0.makelock()
189 189 state0.assertacquirecalled(True)
190 190
191 191 with lock0.inherit() as lock0name:
192 192 state0.assertreleasecalled(True)
193 193 state0.assertpostreleasecalled(False)
194 194 state0.assertlockexists(True)
195 195
196 196 state1 = teststate(self, d, pidoffset=1)
197 197 lock1 = state1.makelock(parentlock=lock0name)
198 198 state1.assertacquirecalled(True)
199 199
200 200 # from within lock1, acquire another lock
201 201 with lock1.inherit() as lock1name:
202 202 # since the file on disk is lock0's this should have the same
203 203 # name
204 204 self.assertEqual(lock0name, lock1name)
205 205
206 206 state2 = teststate(self, d, pidoffset=2)
207 207 lock2 = state2.makelock(parentlock=lock1name)
208 208 state2.assertacquirecalled(True)
209 209
210 210 lock2.release()
211 211 state2.assertreleasecalled(True)
212 212 state2.assertpostreleasecalled(False)
213 213 state2.assertlockexists(True)
214 214
215 215 state1.resetacquirefn()
216 216
217 217 state1.assertacquirecalled(True)
218 218
219 219 lock1.release()
220 220 state1.assertreleasecalled(True)
221 221 state1.assertpostreleasecalled(False)
222 222 state1.assertlockexists(True)
223 223
224 224 lock0.release()
225 225
226 226 def testinheritlockfork(self):
227 227 d = tempfile.mkdtemp(dir=os.getcwd())
228 228 parentstate = teststate(self, d)
229 229 parentlock = parentstate.makelock()
230 230 parentstate.assertacquirecalled(True)
231 231
232 232 # set up lock inheritance
233 233 with parentlock.inherit() as lockname:
234 234 childstate = teststate(self, d, pidoffset=1)
235 235 childlock = childstate.makelock(parentlock=lockname)
236 236 childstate.assertacquirecalled(True)
237 237
238 238 # fork the child lock
239 239 forkchildlock = copy.deepcopy(childlock)
240 240 forkchildlock._pidoffset += 1
241 241 forkchildlock.release()
242 242 childstate.assertreleasecalled(False)
243 243 childstate.assertpostreleasecalled(False)
244 244 childstate.assertlockexists(True)
245 245
246 246 # release the child lock
247 247 childlock.release()
248 248 childstate.assertreleasecalled(True)
249 249 childstate.assertpostreleasecalled(False)
250 250 childstate.assertlockexists(True)
251 251
252 252 parentlock.release()
253 253
254 254 def testinheritcheck(self):
255 255 d = tempfile.mkdtemp(dir=os.getcwd())
256 256 state = teststate(self, d)
257 257 def check():
258 258 raise error.LockInheritanceContractViolation('check failed')
259 259 lock = state.makelock(inheritchecker=check)
260 260 state.assertacquirecalled(True)
261 261
262 262 def tryinherit():
263 263 with lock.inherit():
264 264 pass
265 265
266 266 self.assertRaises(error.LockInheritanceContractViolation, tryinherit)
267 267
268 268 lock.release()
269 269
270 270 if __name__ == '__main__':
271 271 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now