Show More
@@ -19,6 +19,8 b' import codecs' | |||||
19 | from contextlib import contextmanager |
|
19 | from contextlib import contextmanager | |
20 | import io |
|
20 | import io | |
21 | import os |
|
21 | import os | |
|
22 | import shutil | |||
|
23 | import stat | |||
22 | import sys |
|
24 | import sys | |
23 | import tempfile |
|
25 | import tempfile | |
24 | from .capture import CapturedIO, capture_output |
|
26 | from .capture import CapturedIO, capture_output | |
@@ -219,6 +221,17 b" def temp_pyfile(src, ext='.py'):" | |||||
219 | f.flush() |
|
221 | f.flush() | |
220 | return fname, f |
|
222 | return fname, f | |
221 |
|
223 | |||
|
224 | def _copy_metadata(src, dst): | |||
|
225 | """Copy the set of metadata we want for atomic_writing. | |||
|
226 | ||||
|
227 | Permission bits and flags. We'd like to copy file ownership as well, but we | |||
|
228 | can't do that. | |||
|
229 | """ | |||
|
230 | shutil.copymode(src, dst) | |||
|
231 | st = os.stat(src) | |||
|
232 | if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): | |||
|
233 | os.chflags(st.st_flags) | |||
|
234 | ||||
222 | @contextmanager |
|
235 | @contextmanager | |
223 | def atomic_writing(path, text=True, encoding='utf-8', **kwargs): |
|
236 | def atomic_writing(path, text=True, encoding='utf-8', **kwargs): | |
224 | """Context manager to write to a file only if the entire write is successful. |
|
237 | """Context manager to write to a file only if the entire write is successful. | |
@@ -268,6 +281,13 b" def atomic_writing(path, text=True, encoding='utf-8', **kwargs):" | |||||
268 | # Written successfully, now rename it |
|
281 | # Written successfully, now rename it | |
269 | fileobj.close() |
|
282 | fileobj.close() | |
270 |
|
283 | |||
|
284 | # Copy permission bits, access time, etc. | |||
|
285 | try: | |||
|
286 | _copy_metadata(path, tmp_path) | |||
|
287 | except OSError: | |||
|
288 | # e.g. the file didn't already exist. Ignore any failure to copy metadata | |||
|
289 | pass | |||
|
290 | ||||
271 | if os.name == 'nt' and os.path.exists(path): |
|
291 | if os.name == 'nt' and os.path.exists(path): | |
272 | # Rename over existing file doesn't work on Windows |
|
292 | # Rename over existing file doesn't work on Windows | |
273 | os.remove(path) |
|
293 | os.remove(path) |
@@ -16,6 +16,7 b' from __future__ import absolute_import' | |||||
16 |
|
16 | |||
17 | import io as stdlib_io |
|
17 | import io as stdlib_io | |
18 | import os.path |
|
18 | import os.path | |
|
19 | import stat | |||
19 | import sys |
|
20 | import sys | |
20 |
|
21 | |||
21 | from subprocess import Popen, PIPE |
|
22 | from subprocess import Popen, PIPE | |
@@ -134,6 +135,10 b' def test_atomic_writing():' | |||||
134 | f1 = os.path.join(td, 'penguin') |
|
135 | f1 = os.path.join(td, 'penguin') | |
135 | with stdlib_io.open(f1, 'w') as f: |
|
136 | with stdlib_io.open(f1, 'w') as f: | |
136 | f.write(u'Before') |
|
137 | f.write(u'Before') | |
|
138 | ||||
|
139 | if os.name != 'nt': | |||
|
140 | os.chmod(f1, 0o701) | |||
|
141 | orig_mode = stat.S_IMODE(os.stat(f1).st_mode) | |||
137 |
|
142 | |||
138 | with nt.assert_raises(CustomExc): |
|
143 | with nt.assert_raises(CustomExc): | |
139 | with atomic_writing(f1) as f: |
|
144 | with atomic_writing(f1) as f: | |
@@ -149,3 +154,7 b' def test_atomic_writing():' | |||||
149 |
|
154 | |||
150 | with stdlib_io.open(f1, 'r') as f: |
|
155 | with stdlib_io.open(f1, 'r') as f: | |
151 | nt.assert_equal(f.read(), u'Overwritten') |
|
156 | nt.assert_equal(f.read(), u'Overwritten') | |
|
157 | ||||
|
158 | if os.name != 'nt': | |||
|
159 | mode = stat.S_IMODE(os.stat(f1).st_mode) | |||
|
160 | nt.assert_equal(mode, orig_mode) |
General Comments 0
You need to be logged in to leave comments.
Login now