test-atomictempfile.py
125 lines
| 4.2 KiB
| text/x-python
|
PythonLexer
/ tests / test-atomictempfile.py
Pulkit Goyal
|
r29194 | from __future__ import absolute_import | ||
import glob | ||||
Greg Ward
|
r14007 | import os | ||
Martijn Pieters
|
r29391 | import shutil | ||
Augie Fackler
|
r36799 | import stat | ||
Martijn Pieters
|
r29391 | import tempfile | ||
Idan Kamara
|
r18666 | import unittest | ||
Pulkit Goyal
|
r29194 | from mercurial import ( | ||
Pulkit Goyal
|
r36299 | pycompat, | ||
Pulkit Goyal
|
r29194 | util, | ||
) | ||||
atomictempfile = util.atomictempfile | ||||
Greg Ward
|
r14007 | |||
Pulkit Goyal
|
r36299 | if pycompat.ispy3: | ||
xrange = range | ||||
Idan Kamara
|
r18666 | class testatomictempfile(unittest.TestCase): | ||
Martijn Pieters
|
r29391 | def setUp(self): | ||
Augie Fackler
|
r36634 | self._testdir = tempfile.mkdtemp(b'atomictempfiletest') | ||
self._filename = os.path.join(self._testdir, b'testfilename') | ||||
Martijn Pieters
|
r29391 | |||
def tearDown(self): | ||||
shutil.rmtree(self._testdir, True) | ||||
Martijn Pieters
|
r29392 | def testsimple(self): | ||
Martijn Pieters
|
r29391 | file = atomictempfile(self._filename) | ||
self.assertFalse(os.path.isfile(self._filename)) | ||||
tempfilename = file._tempname | ||||
self.assertTrue(tempfilename in glob.glob( | ||||
Augie Fackler
|
r36634 | os.path.join(self._testdir, b'.testfilename-*'))) | ||
Greg Ward
|
r14007 | |||
timeless
|
r29188 | file.write(b'argh\n') | ||
Idan Kamara
|
r18666 | file.close() | ||
Greg Ward
|
r14007 | |||
Martijn Pieters
|
r29391 | self.assertTrue(os.path.isfile(self._filename)) | ||
self.assertTrue(tempfilename not in glob.glob( | ||||
Augie Fackler
|
r36634 | os.path.join(self._testdir, b'.testfilename-*'))) | ||
Greg Ward
|
r14007 | |||
Idan Kamara
|
r18666 | # discard() removes the temp file without making the write permanent | ||
Martijn Pieters
|
r29392 | def testdiscard(self): | ||
Martijn Pieters
|
r29391 | file = atomictempfile(self._filename) | ||
Idan Kamara
|
r18666 | (dir, basename) = os.path.split(file._tempname) | ||
Greg Ward
|
r14007 | |||
timeless
|
r29188 | file.write(b'yo\n') | ||
Idan Kamara
|
r18666 | file.discard() | ||
Greg Ward
|
r14007 | |||
Martijn Pieters
|
r29391 | self.assertFalse(os.path.isfile(self._filename)) | ||
Augie Fackler
|
r36634 | self.assertTrue(basename not in os.listdir(b'.')) | ||
Idan Kamara
|
r18666 | |||
# if a programmer screws up and passes bad args to atomictempfile, they | ||||
# get a plain ordinary TypeError, not infinite recursion | ||||
Martijn Pieters
|
r29392 | def testoops(self): | ||
Gregory Szorc
|
r32279 | with self.assertRaises(TypeError): | ||
atomictempfile() | ||||
Greg Ward
|
r14007 | |||
FUJIWARA Katsunori
|
r29201 | # checkambig=True avoids ambiguity of timestamp | ||
Martijn Pieters
|
r29392 | def testcheckambig(self): | ||
FUJIWARA Katsunori
|
r29201 | def atomicwrite(checkambig): | ||
Martijn Pieters
|
r29391 | f = atomictempfile(self._filename, checkambig=checkambig) | ||
Augie Fackler
|
r36634 | f.write(b'FOO') | ||
FUJIWARA Katsunori
|
r29201 | f.close() | ||
# try some times, because reproduction of ambiguity depends on | ||||
# "filesystem time" | ||||
for i in xrange(5): | ||||
atomicwrite(False) | ||||
Martijn Pieters
|
r29391 | oldstat = os.stat(self._filename) | ||
Augie Fackler
|
r36799 | if oldstat[stat.ST_CTIME] != oldstat[stat.ST_MTIME]: | ||
FUJIWARA Katsunori
|
r29201 | # subsequent changing never causes ambiguity | ||
continue | ||||
repetition = 3 | ||||
# repeat atomic write with checkambig=True, to examine | ||||
Mads Kiilerich
|
r30332 | # whether st_mtime is advanced multiple times as expected | ||
FUJIWARA Katsunori
|
r29201 | for j in xrange(repetition): | ||
atomicwrite(True) | ||||
Martijn Pieters
|
r29391 | newstat = os.stat(self._filename) | ||
Augie Fackler
|
r36799 | if oldstat[stat.ST_CTIME] != newstat[stat.ST_CTIME]: | ||
FUJIWARA Katsunori
|
r29201 | # timestamp ambiguity was naturally avoided while repetition | ||
continue | ||||
# st_mtime should be advanced "repetition" times, because | ||||
Mads Kiilerich
|
r30332 | # all atomicwrite() occurred at same time (in sec) | ||
Augie Fackler
|
r36799 | oldtime = (oldstat[stat.ST_MTIME] + repetition) & 0x7fffffff | ||
self.assertTrue(newstat[stat.ST_MTIME] == oldtime) | ||||
FUJIWARA Katsunori
|
r29201 | # no more examination is needed, if assumption above is true | ||
break | ||||
else: | ||||
# This platform seems too slow to examine anti-ambiguity | ||||
# of file timestamp (or test happened to be executed at | ||||
# bad timing). Exit silently in this case, because running | ||||
# on other faster platforms can detect problems | ||||
pass | ||||
Martijn Pieters
|
r29393 | def testread(self): | ||
with open(self._filename, 'wb') as f: | ||||
f.write(b'foobar\n') | ||||
Augie Fackler
|
r36634 | file = atomictempfile(self._filename, mode=b'rb') | ||
Martijn Pieters
|
r29393 | self.assertTrue(file.read(), b'foobar\n') | ||
file.discard() | ||||
Martijn Pieters
|
r29394 | def testcontextmanagersuccess(self): | ||
"""When the context closes, the file is closed""" | ||||
Augie Fackler
|
r36634 | with atomictempfile(b'foo') as f: | ||
self.assertFalse(os.path.isfile(b'foo')) | ||||
Martijn Pieters
|
r29394 | f.write(b'argh\n') | ||
Augie Fackler
|
r36634 | self.assertTrue(os.path.isfile(b'foo')) | ||
Martijn Pieters
|
r29394 | |||
def testcontextmanagerfailure(self): | ||||
"""On exception, the file is discarded""" | ||||
try: | ||||
Augie Fackler
|
r36634 | with atomictempfile(b'foo') as f: | ||
self.assertFalse(os.path.isfile(b'foo')) | ||||
Martijn Pieters
|
r29394 | f.write(b'argh\n') | ||
raise ValueError | ||||
except ValueError: | ||||
pass | ||||
Augie Fackler
|
r36634 | self.assertFalse(os.path.isfile(b'foo')) | ||
Martijn Pieters
|
r29394 | |||
Greg Ward
|
r14007 | if __name__ == '__main__': | ||
Pulkit Goyal
|
r29194 | import silenttestrunner | ||
Idan Kamara
|
r18666 | silenttestrunner.main(__name__) | ||