##// END OF EJS Templates
test-lock.py: allow PID to be changed in test state...
Siddharth Agarwal -
r26385:fb1a424e default
parent child Browse files
Show More
@@ -1,145 +1,147 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import os
3 import os
4 import silenttestrunner
4 import silenttestrunner
5 import tempfile
5 import tempfile
6 import unittest
6 import unittest
7
7
8 from mercurial import (
8 from mercurial import (
9 lock,
9 lock,
10 scmutil,
10 scmutil,
11 )
11 )
12
12
13 testlockname = 'testlock'
13 testlockname = 'testlock'
14
14
15 class lockwrapper(lock.lock):
15 class lockwrapper(lock.lock):
16 def __init__(self, pidoffset, *args, **kwargs):
16 def __init__(self, pidoffset, *args, **kwargs):
17 # lock.lock.__init__() calls lock(), so the pidoffset assignment needs
17 # lock.lock.__init__() calls lock(), so the pidoffset assignment needs
18 # to be earlier
18 # to be earlier
19 self._pidoffset = pidoffset
19 self._pidoffset = pidoffset
20 super(lockwrapper, self).__init__(*args, **kwargs)
20 super(lockwrapper, self).__init__(*args, **kwargs)
21 def _getpid(self):
21 def _getpid(self):
22 return os.getpid() + self._pidoffset
22 return os.getpid() + self._pidoffset
23
23
24 class teststate(object):
24 class teststate(object):
25 def __init__(self, testcase, dir):
25 def __init__(self, testcase, dir, pidoffset=0):
26 self._testcase = testcase
26 self._testcase = testcase
27 self._acquirecalled = False
27 self._acquirecalled = False
28 self._releasecalled = False
28 self._releasecalled = False
29 self._postreleasecalled = False
29 self._postreleasecalled = False
30 self.vfs = scmutil.vfs(dir, audit=False)
30 self.vfs = scmutil.vfs(dir, audit=False)
31 self._pidoffset = pidoffset
31
32
32 def makelock(self, *args, **kwargs):
33 def makelock(self, *args, **kwargs):
33 l = lock.lock(self.vfs, testlockname, releasefn=self.releasefn,
34 l = lockwrapper(self._pidoffset, self.vfs, testlockname,
34 acquirefn=self.acquirefn, *args, **kwargs)
35 releasefn=self.releasefn, acquirefn=self.acquirefn,
36 *args, **kwargs)
35 l.postrelease.append(self.postreleasefn)
37 l.postrelease.append(self.postreleasefn)
36 return l
38 return l
37
39
38 def acquirefn(self):
40 def acquirefn(self):
39 self._acquirecalled = True
41 self._acquirecalled = True
40
42
41 def releasefn(self):
43 def releasefn(self):
42 self._releasecalled = True
44 self._releasecalled = True
43
45
44 def postreleasefn(self):
46 def postreleasefn(self):
45 self._postreleasecalled = True
47 self._postreleasecalled = True
46
48
47 def assertacquirecalled(self, called):
49 def assertacquirecalled(self, called):
48 self._testcase.assertEqual(
50 self._testcase.assertEqual(
49 self._acquirecalled, called,
51 self._acquirecalled, called,
50 'expected acquire to be %s but was actually %s' % (
52 'expected acquire to be %s but was actually %s' % (
51 self._tocalled(called),
53 self._tocalled(called),
52 self._tocalled(self._acquirecalled),
54 self._tocalled(self._acquirecalled),
53 ))
55 ))
54
56
55 def resetacquirefn(self):
57 def resetacquirefn(self):
56 self._acquirecalled = False
58 self._acquirecalled = False
57
59
58 def assertreleasecalled(self, called):
60 def assertreleasecalled(self, called):
59 self._testcase.assertEqual(
61 self._testcase.assertEqual(
60 self._releasecalled, called,
62 self._releasecalled, called,
61 'expected release to be %s but was actually %s' % (
63 'expected release to be %s but was actually %s' % (
62 self._tocalled(called),
64 self._tocalled(called),
63 self._tocalled(self._releasecalled),
65 self._tocalled(self._releasecalled),
64 ))
66 ))
65
67
66 def assertpostreleasecalled(self, called):
68 def assertpostreleasecalled(self, called):
67 self._testcase.assertEqual(
69 self._testcase.assertEqual(
68 self._postreleasecalled, called,
70 self._postreleasecalled, called,
69 'expected postrelease to be %s but was actually %s' % (
71 'expected postrelease to be %s but was actually %s' % (
70 self._tocalled(called),
72 self._tocalled(called),
71 self._tocalled(self._postreleasecalled),
73 self._tocalled(self._postreleasecalled),
72 ))
74 ))
73
75
74 def assertlockexists(self, exists):
76 def assertlockexists(self, exists):
75 actual = self.vfs.lexists(testlockname)
77 actual = self.vfs.lexists(testlockname)
76 self._testcase.assertEqual(
78 self._testcase.assertEqual(
77 actual, exists,
79 actual, exists,
78 'expected lock to %s but actually did %s' % (
80 'expected lock to %s but actually did %s' % (
79 self._toexists(exists),
81 self._toexists(exists),
80 self._toexists(actual),
82 self._toexists(actual),
81 ))
83 ))
82
84
83 def _tocalled(self, called):
85 def _tocalled(self, called):
84 if called:
86 if called:
85 return 'called'
87 return 'called'
86 else:
88 else:
87 return 'not called'
89 return 'not called'
88
90
89 def _toexists(self, exists):
91 def _toexists(self, exists):
90 if exists:
92 if exists:
91 return 'exist'
93 return 'exist'
92 else:
94 else:
93 return 'not exist'
95 return 'not exist'
94
96
95 class testlock(unittest.TestCase):
97 class testlock(unittest.TestCase):
96 def testlock(self):
98 def testlock(self):
97 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
99 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
98 lock = state.makelock()
100 lock = state.makelock()
99 state.assertacquirecalled(True)
101 state.assertacquirecalled(True)
100 lock.release()
102 lock.release()
101 state.assertreleasecalled(True)
103 state.assertreleasecalled(True)
102 state.assertpostreleasecalled(True)
104 state.assertpostreleasecalled(True)
103 state.assertlockexists(False)
105 state.assertlockexists(False)
104
106
105 def testrecursivelock(self):
107 def testrecursivelock(self):
106 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
108 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
107 lock = state.makelock()
109 lock = state.makelock()
108 state.assertacquirecalled(True)
110 state.assertacquirecalled(True)
109
111
110 state.resetacquirefn()
112 state.resetacquirefn()
111 lock.lock()
113 lock.lock()
112 # recursive lock should not call acquirefn again
114 # recursive lock should not call acquirefn again
113 state.assertacquirecalled(False)
115 state.assertacquirecalled(False)
114
116
115 lock.release() # brings lock refcount down from 2 to 1
117 lock.release() # brings lock refcount down from 2 to 1
116 state.assertreleasecalled(False)
118 state.assertreleasecalled(False)
117 state.assertpostreleasecalled(False)
119 state.assertpostreleasecalled(False)
118 state.assertlockexists(True)
120 state.assertlockexists(True)
119
121
120 lock.release() # releases the lock
122 lock.release() # releases the lock
121 state.assertreleasecalled(True)
123 state.assertreleasecalled(True)
122 state.assertpostreleasecalled(True)
124 state.assertpostreleasecalled(True)
123 state.assertlockexists(False)
125 state.assertlockexists(False)
124
126
125 def testlockfork(self):
127 def testlockfork(self):
126 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
128 state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
127 lock = state.makelock()
129 lock = state.makelock()
128 state.assertacquirecalled(True)
130 state.assertacquirecalled(True)
129 lock.lock()
131 lock.lock()
130 # fake a fork
132 # fake a fork
131 lock.pid += 1
133 lock.pid += 1
132 lock.release()
134 lock.release()
133 state.assertreleasecalled(False)
135 state.assertreleasecalled(False)
134 state.assertpostreleasecalled(False)
136 state.assertpostreleasecalled(False)
135 state.assertlockexists(True)
137 state.assertlockexists(True)
136
138
137 # release the actual lock
139 # release the actual lock
138 lock.pid -= 1
140 lock.pid -= 1
139 lock.release()
141 lock.release()
140 state.assertreleasecalled(True)
142 state.assertreleasecalled(True)
141 state.assertpostreleasecalled(True)
143 state.assertpostreleasecalled(True)
142 state.assertlockexists(False)
144 state.assertlockexists(False)
143
145
144 if __name__ == '__main__':
146 if __name__ == '__main__':
145 silenttestrunner.main(__name__)
147 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now