Show More
@@ -1,124 +1,124 | |||
|
1 | 1 | from __future__ import absolute_import |
|
2 | 2 | |
|
3 | 3 | import glob |
|
4 | 4 | import os |
|
5 | 5 | import shutil |
|
6 | 6 | import tempfile |
|
7 | 7 | import unittest |
|
8 | 8 | |
|
9 | 9 | from mercurial import ( |
|
10 | 10 | pycompat, |
|
11 | 11 | util, |
|
12 | 12 | ) |
|
13 | 13 | atomictempfile = util.atomictempfile |
|
14 | 14 | |
|
15 | 15 | if pycompat.ispy3: |
|
16 | 16 | xrange = range |
|
17 | 17 | |
|
18 | 18 | class testatomictempfile(unittest.TestCase): |
|
19 | 19 | def setUp(self): |
|
20 | self._testdir = tempfile.mkdtemp('atomictempfiletest') | |
|
21 | self._filename = os.path.join(self._testdir, 'testfilename') | |
|
20 | self._testdir = tempfile.mkdtemp(b'atomictempfiletest') | |
|
21 | self._filename = os.path.join(self._testdir, b'testfilename') | |
|
22 | 22 | |
|
23 | 23 | def tearDown(self): |
|
24 | 24 | shutil.rmtree(self._testdir, True) |
|
25 | 25 | |
|
26 | 26 | def testsimple(self): |
|
27 | 27 | file = atomictempfile(self._filename) |
|
28 | 28 | self.assertFalse(os.path.isfile(self._filename)) |
|
29 | 29 | tempfilename = file._tempname |
|
30 | 30 | self.assertTrue(tempfilename in glob.glob( |
|
31 | os.path.join(self._testdir, '.testfilename-*'))) | |
|
31 | os.path.join(self._testdir, b'.testfilename-*'))) | |
|
32 | 32 | |
|
33 | 33 | file.write(b'argh\n') |
|
34 | 34 | file.close() |
|
35 | 35 | |
|
36 | 36 | self.assertTrue(os.path.isfile(self._filename)) |
|
37 | 37 | self.assertTrue(tempfilename not in glob.glob( |
|
38 | os.path.join(self._testdir, '.testfilename-*'))) | |
|
38 | os.path.join(self._testdir, b'.testfilename-*'))) | |
|
39 | 39 | |
|
40 | 40 | # discard() removes the temp file without making the write permanent |
|
41 | 41 | def testdiscard(self): |
|
42 | 42 | file = atomictempfile(self._filename) |
|
43 | 43 | (dir, basename) = os.path.split(file._tempname) |
|
44 | 44 | |
|
45 | 45 | file.write(b'yo\n') |
|
46 | 46 | file.discard() |
|
47 | 47 | |
|
48 | 48 | self.assertFalse(os.path.isfile(self._filename)) |
|
49 | self.assertTrue(basename not in os.listdir('.')) | |
|
49 | self.assertTrue(basename not in os.listdir(b'.')) | |
|
50 | 50 | |
|
51 | 51 | # if a programmer screws up and passes bad args to atomictempfile, they |
|
52 | 52 | # get a plain ordinary TypeError, not infinite recursion |
|
53 | 53 | def testoops(self): |
|
54 | 54 | with self.assertRaises(TypeError): |
|
55 | 55 | atomictempfile() |
|
56 | 56 | |
|
57 | 57 | # checkambig=True avoids ambiguity of timestamp |
|
58 | 58 | def testcheckambig(self): |
|
59 | 59 | def atomicwrite(checkambig): |
|
60 | 60 | f = atomictempfile(self._filename, checkambig=checkambig) |
|
61 | f.write('FOO') | |
|
61 | f.write(b'FOO') | |
|
62 | 62 | f.close() |
|
63 | 63 | |
|
64 | 64 | # try some times, because reproduction of ambiguity depends on |
|
65 | 65 | # "filesystem time" |
|
66 | 66 | for i in xrange(5): |
|
67 | 67 | atomicwrite(False) |
|
68 | 68 | oldstat = os.stat(self._filename) |
|
69 | 69 | if oldstat.st_ctime != oldstat.st_mtime: |
|
70 | 70 | # subsequent changing never causes ambiguity |
|
71 | 71 | continue |
|
72 | 72 | |
|
73 | 73 | repetition = 3 |
|
74 | 74 | |
|
75 | 75 | # repeat atomic write with checkambig=True, to examine |
|
76 | 76 | # whether st_mtime is advanced multiple times as expected |
|
77 | 77 | for j in xrange(repetition): |
|
78 | 78 | atomicwrite(True) |
|
79 | 79 | newstat = os.stat(self._filename) |
|
80 | 80 | if oldstat.st_ctime != newstat.st_ctime: |
|
81 | 81 | # timestamp ambiguity was naturally avoided while repetition |
|
82 | 82 | continue |
|
83 | 83 | |
|
84 | 84 | # st_mtime should be advanced "repetition" times, because |
|
85 | 85 | # all atomicwrite() occurred at same time (in sec) |
|
86 | 86 | self.assertTrue(newstat.st_mtime == |
|
87 | 87 | ((oldstat.st_mtime + repetition) & 0x7fffffff)) |
|
88 | 88 | # no more examination is needed, if assumption above is true |
|
89 | 89 | break |
|
90 | 90 | else: |
|
91 | 91 | # This platform seems too slow to examine anti-ambiguity |
|
92 | 92 | # of file timestamp (or test happened to be executed at |
|
93 | 93 | # bad timing). Exit silently in this case, because running |
|
94 | 94 | # on other faster platforms can detect problems |
|
95 | 95 | pass |
|
96 | 96 | |
|
97 | 97 | def testread(self): |
|
98 | 98 | with open(self._filename, 'wb') as f: |
|
99 | 99 | f.write(b'foobar\n') |
|
100 | file = atomictempfile(self._filename, mode='rb') | |
|
100 | file = atomictempfile(self._filename, mode=b'rb') | |
|
101 | 101 | self.assertTrue(file.read(), b'foobar\n') |
|
102 | 102 | file.discard() |
|
103 | 103 | |
|
104 | 104 | def testcontextmanagersuccess(self): |
|
105 | 105 | """When the context closes, the file is closed""" |
|
106 | with atomictempfile('foo') as f: | |
|
107 | self.assertFalse(os.path.isfile('foo')) | |
|
106 | with atomictempfile(b'foo') as f: | |
|
107 | self.assertFalse(os.path.isfile(b'foo')) | |
|
108 | 108 | f.write(b'argh\n') |
|
109 | self.assertTrue(os.path.isfile('foo')) | |
|
109 | self.assertTrue(os.path.isfile(b'foo')) | |
|
110 | 110 | |
|
111 | 111 | def testcontextmanagerfailure(self): |
|
112 | 112 | """On exception, the file is discarded""" |
|
113 | 113 | try: |
|
114 | with atomictempfile('foo') as f: | |
|
115 | self.assertFalse(os.path.isfile('foo')) | |
|
114 | with atomictempfile(b'foo') as f: | |
|
115 | self.assertFalse(os.path.isfile(b'foo')) | |
|
116 | 116 | f.write(b'argh\n') |
|
117 | 117 | raise ValueError |
|
118 | 118 | except ValueError: |
|
119 | 119 | pass |
|
120 | self.assertFalse(os.path.isfile('foo')) | |
|
120 | self.assertFalse(os.path.isfile(b'foo')) | |
|
121 | 121 | |
|
122 | 122 | if __name__ == '__main__': |
|
123 | 123 | import silenttestrunner |
|
124 | 124 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now