##// END OF EJS Templates
progress: extract stubs to restart ferr.flush() and .write() on EINTR
progress: extract stubs to restart ferr.flush() and .write() on EINTR

File last commit:

r30332:318a24b5 default
r32048:c3ef33fd default
Show More
test-atomictempfile.py
119 lines | 4.1 KiB | text/x-python | PythonLexer
/ tests / test-atomictempfile.py
Pulkit Goyal
py3: make tests/test-atomictempfile.py use absolute_import
r29194 from __future__ import absolute_import
import glob
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007 import os
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 import shutil
import tempfile
Idan Kamara
test-atomictempfile: convert to unit test
r18666 import unittest
Pulkit Goyal
py3: make tests/test-atomictempfile.py use absolute_import
r29194 from mercurial import (
util,
)
atomictempfile = util.atomictempfile
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007
Idan Kamara
test-atomictempfile: convert to unit test
r18666 class testatomictempfile(unittest.TestCase):
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 def setUp(self):
self._testdir = tempfile.mkdtemp('atomictempfiletest')
self._filename = os.path.join(self._testdir, 'testfilename')
def tearDown(self):
shutil.rmtree(self._testdir, True)
Martijn Pieters
atomictempfile: remove test ordering...
r29392 def testsimple(self):
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 file = atomictempfile(self._filename)
self.assertFalse(os.path.isfile(self._filename))
tempfilename = file._tempname
self.assertTrue(tempfilename in glob.glob(
os.path.join(self._testdir, '.testfilename-*')))
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007
timeless
tests: mark test-atomictempfile.py write as binary
r29188 file.write(b'argh\n')
Idan Kamara
test-atomictempfile: convert to unit test
r18666 file.close()
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 self.assertTrue(os.path.isfile(self._filename))
self.assertTrue(tempfilename not in glob.glob(
os.path.join(self._testdir, '.testfilename-*')))
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007
Idan Kamara
test-atomictempfile: convert to unit test
r18666 # discard() removes the temp file without making the write permanent
Martijn Pieters
atomictempfile: remove test ordering...
r29392 def testdiscard(self):
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 file = atomictempfile(self._filename)
Idan Kamara
test-atomictempfile: convert to unit test
r18666 (dir, basename) = os.path.split(file._tempname)
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007
timeless
tests: mark test-atomictempfile.py write as binary
r29188 file.write(b'yo\n')
Idan Kamara
test-atomictempfile: convert to unit test
r18666 file.discard()
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 self.assertFalse(os.path.isfile(self._filename))
Idan Kamara
test-atomictempfile: convert to unit test
r18666 self.assertTrue(basename not in os.listdir('.'))
# if a programmer screws up and passes bad args to atomictempfile, they
# get a plain ordinary TypeError, not infinite recursion
Martijn Pieters
atomictempfile: remove test ordering...
r29392 def testoops(self):
Idan Kamara
test-atomictempfile: convert to unit test
r18666 self.assertRaises(TypeError, atomictempfile)
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007
FUJIWARA Katsunori
util: make atomictempfile avoid ambiguity of file stat if needed...
r29201 # checkambig=True avoids ambiguity of timestamp
Martijn Pieters
atomictempfile: remove test ordering...
r29392 def testcheckambig(self):
FUJIWARA Katsunori
util: make atomictempfile avoid ambiguity of file stat if needed...
r29201 def atomicwrite(checkambig):
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 f = atomictempfile(self._filename, checkambig=checkambig)
FUJIWARA Katsunori
util: make atomictempfile avoid ambiguity of file stat if needed...
r29201 f.write('FOO')
f.close()
# try some times, because reproduction of ambiguity depends on
# "filesystem time"
for i in xrange(5):
atomicwrite(False)
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 oldstat = os.stat(self._filename)
FUJIWARA Katsunori
util: make atomictempfile avoid ambiguity of file stat if needed...
r29201 if oldstat.st_ctime != oldstat.st_mtime:
# subsequent changing never causes ambiguity
continue
repetition = 3
# repeat atomic write with checkambig=True, to examine
Mads Kiilerich
spelling: fixes of non-dictionary words
r30332 # whether st_mtime is advanced multiple times as expected
FUJIWARA Katsunori
util: make atomictempfile avoid ambiguity of file stat if needed...
r29201 for j in xrange(repetition):
atomicwrite(True)
Martijn Pieters
atomictempfile: use a tempdir to keep the test environment clean...
r29391 newstat = os.stat(self._filename)
FUJIWARA Katsunori
util: make atomictempfile avoid ambiguity of file stat if needed...
r29201 if oldstat.st_ctime != newstat.st_ctime:
# timestamp ambiguity was naturally avoided while repetition
continue
# st_mtime should be advanced "repetition" times, because
Mads Kiilerich
spelling: fixes of non-dictionary words
r30332 # all atomicwrite() occurred at same time (in sec)
FUJIWARA Katsunori
util: make atomictempfile avoid ambiguity of file stat if needed...
r29201 self.assertTrue(newstat.st_mtime ==
((oldstat.st_mtime + repetition) & 0x7fffffff))
# no more examination is needed, if assumption above is true
break
else:
# This platform seems too slow to examine anti-ambiguity
# of file timestamp (or test happened to be executed at
# bad timing). Exit silently in this case, because running
# on other faster platforms can detect problems
pass
Martijn Pieters
atomictempfile: add read to the supported file operations
r29393 def testread(self):
with open(self._filename, 'wb') as f:
f.write(b'foobar\n')
file = atomictempfile(self._filename, mode='rb')
self.assertTrue(file.read(), b'foobar\n')
file.discard()
Martijn Pieters
atomictempfile: add context manager support...
r29394 def testcontextmanagersuccess(self):
"""When the context closes, the file is closed"""
with atomictempfile('foo') as f:
self.assertFalse(os.path.isfile('foo'))
f.write(b'argh\n')
self.assertTrue(os.path.isfile('foo'))
def testcontextmanagerfailure(self):
"""On exception, the file is discarded"""
try:
with atomictempfile('foo') as f:
self.assertFalse(os.path.isfile('foo'))
f.write(b'argh\n')
raise ValueError
except ValueError:
pass
self.assertFalse(os.path.isfile('foo'))
Greg Ward
atomictempfile: avoid infinite recursion in __del__()....
r14007 if __name__ == '__main__':
Pulkit Goyal
py3: make tests/test-atomictempfile.py use absolute_import
r29194 import silenttestrunner
Idan Kamara
test-atomictempfile: convert to unit test
r18666 silenttestrunner.main(__name__)