test-atomictempfile.py
119 lines
| 4.1 KiB
| text/x-python
|
PythonLexer
/ tests / test-atomictempfile.py
Pulkit Goyal
|
r29194 | from __future__ import absolute_import | ||
import glob | ||||
Greg Ward
|
r14007 | import os | ||
Martijn Pieters
|
r29391 | import shutil | ||
import tempfile | ||||
Idan Kamara
|
r18666 | import unittest | ||
Pulkit Goyal
|
r29194 | from mercurial import ( | ||
util, | ||||
) | ||||
atomictempfile = util.atomictempfile | ||||
Greg Ward
|
r14007 | |||
Idan Kamara
|
r18666 | class testatomictempfile(unittest.TestCase): | ||
Martijn Pieters
|
r29391 | def setUp(self): | ||
self._testdir = tempfile.mkdtemp('atomictempfiletest') | ||||
self._filename = os.path.join(self._testdir, 'testfilename') | ||||
def tearDown(self): | ||||
shutil.rmtree(self._testdir, True) | ||||
Martijn Pieters
|
r29392 | def testsimple(self): | ||
Martijn Pieters
|
r29391 | file = atomictempfile(self._filename) | ||
self.assertFalse(os.path.isfile(self._filename)) | ||||
tempfilename = file._tempname | ||||
self.assertTrue(tempfilename in glob.glob( | ||||
os.path.join(self._testdir, '.testfilename-*'))) | ||||
Greg Ward
|
r14007 | |||
timeless
|
r29188 | file.write(b'argh\n') | ||
Idan Kamara
|
r18666 | file.close() | ||
Greg Ward
|
r14007 | |||
Martijn Pieters
|
r29391 | self.assertTrue(os.path.isfile(self._filename)) | ||
self.assertTrue(tempfilename not in glob.glob( | ||||
os.path.join(self._testdir, '.testfilename-*'))) | ||||
Greg Ward
|
r14007 | |||
Idan Kamara
|
r18666 | # discard() removes the temp file without making the write permanent | ||
Martijn Pieters
|
r29392 | def testdiscard(self): | ||
Martijn Pieters
|
r29391 | file = atomictempfile(self._filename) | ||
Idan Kamara
|
r18666 | (dir, basename) = os.path.split(file._tempname) | ||
Greg Ward
|
r14007 | |||
timeless
|
r29188 | file.write(b'yo\n') | ||
Idan Kamara
|
r18666 | file.discard() | ||
Greg Ward
|
r14007 | |||
Martijn Pieters
|
r29391 | self.assertFalse(os.path.isfile(self._filename)) | ||
Idan Kamara
|
r18666 | self.assertTrue(basename not in os.listdir('.')) | ||
# if a programmer screws up and passes bad args to atomictempfile, they | ||||
# get a plain ordinary TypeError, not infinite recursion | ||||
Martijn Pieters
|
r29392 | def testoops(self): | ||
Idan Kamara
|
r18666 | self.assertRaises(TypeError, atomictempfile) | ||
Greg Ward
|
r14007 | |||
FUJIWARA Katsunori
|
r29201 | # checkambig=True avoids ambiguity of timestamp | ||
Martijn Pieters
|
r29392 | def testcheckambig(self): | ||
FUJIWARA Katsunori
|
r29201 | def atomicwrite(checkambig): | ||
Martijn Pieters
|
r29391 | f = atomictempfile(self._filename, checkambig=checkambig) | ||
FUJIWARA Katsunori
|
r29201 | f.write('FOO') | ||
f.close() | ||||
# try some times, because reproduction of ambiguity depends on | ||||
# "filesystem time" | ||||
for i in xrange(5): | ||||
atomicwrite(False) | ||||
Martijn Pieters
|
r29391 | oldstat = os.stat(self._filename) | ||
FUJIWARA Katsunori
|
r29201 | if oldstat.st_ctime != oldstat.st_mtime: | ||
# subsequent changing never causes ambiguity | ||||
continue | ||||
repetition = 3 | ||||
# repeat atomic write with checkambig=True, to examine | ||||
Mads Kiilerich
|
r30332 | # whether st_mtime is advanced multiple times as expected | ||
FUJIWARA Katsunori
|
r29201 | for j in xrange(repetition): | ||
atomicwrite(True) | ||||
Martijn Pieters
|
r29391 | newstat = os.stat(self._filename) | ||
FUJIWARA Katsunori
|
r29201 | if oldstat.st_ctime != newstat.st_ctime: | ||
# timestamp ambiguity was naturally avoided while repetition | ||||
continue | ||||
# st_mtime should be advanced "repetition" times, because | ||||
Mads Kiilerich
|
r30332 | # all atomicwrite() occurred at same time (in sec) | ||
FUJIWARA Katsunori
|
r29201 | self.assertTrue(newstat.st_mtime == | ||
((oldstat.st_mtime + repetition) & 0x7fffffff)) | ||||
# no more examination is needed, if assumption above is true | ||||
break | ||||
else: | ||||
# This platform seems too slow to examine anti-ambiguity | ||||
# of file timestamp (or test happened to be executed at | ||||
# bad timing). Exit silently in this case, because running | ||||
# on other faster platforms can detect problems | ||||
pass | ||||
Martijn Pieters
|
r29393 | def testread(self): | ||
with open(self._filename, 'wb') as f: | ||||
f.write(b'foobar\n') | ||||
file = atomictempfile(self._filename, mode='rb') | ||||
self.assertTrue(file.read(), b'foobar\n') | ||||
file.discard() | ||||
Martijn Pieters
|
r29394 | def testcontextmanagersuccess(self): | ||
"""When the context closes, the file is closed""" | ||||
with atomictempfile('foo') as f: | ||||
self.assertFalse(os.path.isfile('foo')) | ||||
f.write(b'argh\n') | ||||
self.assertTrue(os.path.isfile('foo')) | ||||
def testcontextmanagerfailure(self): | ||||
"""On exception, the file is discarded""" | ||||
try: | ||||
with atomictempfile('foo') as f: | ||||
self.assertFalse(os.path.isfile('foo')) | ||||
f.write(b'argh\n') | ||||
raise ValueError | ||||
except ValueError: | ||||
pass | ||||
self.assertFalse(os.path.isfile('foo')) | ||||
Greg Ward
|
r14007 | if __name__ == '__main__': | ||
Pulkit Goyal
|
r29194 | import silenttestrunner | ||
Idan Kamara
|
r18666 | silenttestrunner.main(__name__) | ||