Show More
@@ -1453,11 +1453,12 b' class atomictempfile(object):' | |||
|
1453 | 1453 | visible. If the object is destroyed without being closed, all your |
|
1454 | 1454 | writes are discarded. |
|
1455 | 1455 | ''' |
|
1456 | def __init__(self, name, mode='w+b', createmode=None): | |
|
1456 | def __init__(self, name, mode='w+b', createmode=None, checkambig=False): | |
|
1457 | 1457 | self.__name = name # permanent name |
|
1458 | 1458 | self._tempname = mktempcopy(name, emptyok=('w' in mode), |
|
1459 | 1459 | createmode=createmode) |
|
1460 | 1460 | self._fp = posixfile(self._tempname, mode) |
|
1461 | self._checkambig = checkambig | |
|
1461 | 1462 | |
|
1462 | 1463 | # delegated methods |
|
1463 | 1464 | self.write = self._fp.write |
@@ -1468,7 +1469,17 b' class atomictempfile(object):' | |||
|
1468 | 1469 | def close(self): |
|
1469 | 1470 | if not self._fp.closed: |
|
1470 | 1471 | self._fp.close() |
|
1471 |
|
|
|
1472 | filename = localpath(self.__name) | |
|
1473 | oldstat = self._checkambig and filestat(filename) | |
|
1474 | if oldstat and oldstat.stat: | |
|
1475 | rename(self._tempname, filename) | |
|
1476 | newstat = filestat(filename) | |
|
1477 | if newstat.isambig(oldstat): | |
|
1478 | # stat of changed file is ambiguous to original one | |
|
1479 | advanced = (oldstat.stat.st_mtime + 1) & 0x7fffffff | |
|
1480 | os.utime(filename, (advanced, advanced)) | |
|
1481 | else: | |
|
1482 | rename(self._tempname, filename) | |
|
1472 | 1483 | |
|
1473 | 1484 | def discard(self): |
|
1474 | 1485 | if not self._fp.closed: |
@@ -42,6 +42,46 b' class testatomictempfile(unittest.TestCa' | |||
|
42 | 42 | def test3_oops(self): |
|
43 | 43 | self.assertRaises(TypeError, atomictempfile) |
|
44 | 44 | |
|
45 | # checkambig=True avoids ambiguity of timestamp | |
|
46 | def test4_checkambig(self): | |
|
47 | def atomicwrite(checkambig): | |
|
48 | f = atomictempfile('foo', checkambig=checkambig) | |
|
49 | f.write('FOO') | |
|
50 | f.close() | |
|
51 | ||
|
52 | # try some times, because reproduction of ambiguity depends on | |
|
53 | # "filesystem time" | |
|
54 | for i in xrange(5): | |
|
55 | atomicwrite(False) | |
|
56 | oldstat = os.stat('foo') | |
|
57 | if oldstat.st_ctime != oldstat.st_mtime: | |
|
58 | # subsequent changing never causes ambiguity | |
|
59 | continue | |
|
60 | ||
|
61 | repetition = 3 | |
|
62 | ||
|
63 | # repeat atomic write with checkambig=True, to examine | |
|
64 | # whether st_mtime is advanced multiple times as expecetd | |
|
65 | for j in xrange(repetition): | |
|
66 | atomicwrite(True) | |
|
67 | newstat = os.stat('foo') | |
|
68 | if oldstat.st_ctime != newstat.st_ctime: | |
|
69 | # timestamp ambiguity was naturally avoided while repetition | |
|
70 | continue | |
|
71 | ||
|
72 | # st_mtime should be advanced "repetition" times, because | |
|
73 | # all atomicwrite() occured at same time (in sec) | |
|
74 | self.assertTrue(newstat.st_mtime == | |
|
75 | ((oldstat.st_mtime + repetition) & 0x7fffffff)) | |
|
76 | # no more examination is needed, if assumption above is true | |
|
77 | break | |
|
78 | else: | |
|
79 | # This platform seems too slow to examine anti-ambiguity | |
|
80 | # of file timestamp (or test happened to be executed at | |
|
81 | # bad timing). Exit silently in this case, because running | |
|
82 | # on other faster platforms can detect problems | |
|
83 | pass | |
|
84 | ||
|
45 | 85 | if __name__ == '__main__': |
|
46 | 86 | import silenttestrunner |
|
47 | 87 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now