from __future__ import absolute_import import os import silenttestrunner import tempfile import unittest from mercurial import ( lock, scmutil, ) testlockname = 'testlock' class teststate(object): def __init__(self, testcase): self._testcase = testcase self._acquirecalled = False self._releasecalled = False self._postreleasecalled = False d = tempfile.mkdtemp(dir=os.getcwd()) self.vfs = scmutil.vfs(d, audit=False) def makelock(self, *args, **kwargs): l = lock.lock(self.vfs, testlockname, releasefn=self.releasefn, acquirefn=self.acquirefn, *args, **kwargs) l.postrelease.append(self.postreleasefn) return l def acquirefn(self): self._acquirecalled = True def releasefn(self): self._releasecalled = True def postreleasefn(self): self._postreleasecalled = True def assertacquirecalled(self, called): self._testcase.assertEqual( self._acquirecalled, called, 'expected acquire to be %s but was actually %s' % ( self._tocalled(called), self._tocalled(self._acquirecalled), )) def resetacquirefn(self): self._acquirecalled = False def assertreleasecalled(self, called): self._testcase.assertEqual( self._releasecalled, called, 'expected release to be %s but was actually %s' % ( self._tocalled(called), self._tocalled(self._releasecalled), )) def assertpostreleasecalled(self, called): self._testcase.assertEqual( self._postreleasecalled, called, 'expected postrelease to be %s but was actually %s' % ( self._tocalled(called), self._tocalled(self._postreleasecalled), )) def assertlockexists(self, exists): actual = self.vfs.lexists(testlockname) self._testcase.assertEqual( actual, exists, 'expected lock to %s but actually did %s' % ( self._toexists(exists), self._toexists(actual), )) def _tocalled(self, called): if called: return 'called' else: return 'not called' def _toexists(self, exists): if exists: return 'exists' else: return 'not exists' class testlock(unittest.TestCase): def testlock(self): state = teststate(self) lock = state.makelock() state.assertacquirecalled(True) lock.release() state.assertreleasecalled(True) state.assertpostreleasecalled(True) state.assertlockexists(False) def testrecursivelock(self): state = teststate(self) lock = state.makelock() state.assertacquirecalled(True) state.resetacquirefn() lock.lock() # recursive lock should not call acquirefn again state.assertacquirecalled(False) lock.release() # brings lock refcount down from 2 to 1 state.assertreleasecalled(False) state.assertpostreleasecalled(False) state.assertlockexists(True) lock.release() # releases the lock state.assertreleasecalled(True) state.assertpostreleasecalled(True) state.assertlockexists(False) def testlockfork(self): state = teststate(self) lock = state.makelock() state.assertacquirecalled(True) lock.lock() # fake a fork lock.pid += 1 lock.release() state.assertreleasecalled(False) state.assertpostreleasecalled(False) state.assertlockexists(True) # release the actual lock lock.pid -= 1 lock.release() state.assertreleasecalled(True) state.assertpostreleasecalled(True) state.assertlockexists(False) if __name__ == '__main__': silenttestrunner.main(__name__)