Show More
@@ -19,6 +19,8 b' import codecs' | |||||
19 | from contextlib import contextmanager |
|
19 | from contextlib import contextmanager | |
20 | import io |
|
20 | import io | |
21 | import os |
|
21 | import os | |
|
22 | import shutil | |||
|
23 | import stat | |||
22 | import sys |
|
24 | import sys | |
23 | import tempfile |
|
25 | import tempfile | |
24 | from .capture import CapturedIO, capture_output |
|
26 | from .capture import CapturedIO, capture_output | |
@@ -219,6 +221,17 b" def temp_pyfile(src, ext='.py'):" | |||||
219 | f.flush() |
|
221 | f.flush() | |
220 | return fname, f |
|
222 | return fname, f | |
221 |
|
223 | |||
|
224 | def _copy_metadata(src, dst): | |||
|
225 | """Copy the set of metadata we want for atomic_writing. | |||
|
226 | ||||
|
227 | Permission bits and flags. We'd like to copy file ownership as well, but we | |||
|
228 | can't do that. | |||
|
229 | """ | |||
|
230 | shutil.copymode(src, dst) | |||
|
231 | st = os.stat(src) | |||
|
232 | if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): | |||
|
233 | os.chflags(st.st_flags) | |||
|
234 | ||||
222 | @contextmanager |
|
235 | @contextmanager | |
223 | def atomic_writing(path, text=True, encoding='utf-8', **kwargs): |
|
236 | def atomic_writing(path, text=True, encoding='utf-8', **kwargs): | |
224 | """Context manager to write to a file only if the entire write is successful. |
|
237 | """Context manager to write to a file only if the entire write is successful. | |
@@ -247,6 +260,7 b" def atomic_writing(path, text=True, encoding='utf-8', **kwargs):" | |||||
247 | **kwargs |
|
260 | **kwargs | |
248 | Passed to :func:`io.open`. |
|
261 | Passed to :func:`io.open`. | |
249 | """ |
|
262 | """ | |
|
263 | path = os.path.realpath(path) # Dereference symlinks | |||
250 | dirname, basename = os.path.split(path) |
|
264 | dirname, basename = os.path.split(path) | |
251 | handle, tmp_path = tempfile.mkstemp(prefix=basename, dir=dirname, text=text) |
|
265 | handle, tmp_path = tempfile.mkstemp(prefix=basename, dir=dirname, text=text) | |
252 | if text: |
|
266 | if text: | |
@@ -268,6 +282,13 b" def atomic_writing(path, text=True, encoding='utf-8', **kwargs):" | |||||
268 | # Written successfully, now rename it |
|
282 | # Written successfully, now rename it | |
269 | fileobj.close() |
|
283 | fileobj.close() | |
270 |
|
284 | |||
|
285 | # Copy permission bits, access time, etc. | |||
|
286 | try: | |||
|
287 | _copy_metadata(path, tmp_path) | |||
|
288 | except OSError: | |||
|
289 | # e.g. the file didn't already exist. Ignore any failure to copy metadata | |||
|
290 | pass | |||
|
291 | ||||
271 | if os.name == 'nt' and os.path.exists(path): |
|
292 | if os.name == 'nt' and os.path.exists(path): | |
272 | # Rename over existing file doesn't work on Windows |
|
293 | # Rename over existing file doesn't work on Windows | |
273 | os.remove(path) |
|
294 | os.remove(path) |
@@ -16,6 +16,7 b' from __future__ import absolute_import' | |||||
16 |
|
16 | |||
17 | import io as stdlib_io |
|
17 | import io as stdlib_io | |
18 | import os.path |
|
18 | import os.path | |
|
19 | import stat | |||
19 | import sys |
|
20 | import sys | |
20 |
|
21 | |||
21 | from subprocess import Popen, PIPE |
|
22 | from subprocess import Popen, PIPE | |
@@ -134,6 +135,17 b' def test_atomic_writing():' | |||||
134 | f1 = os.path.join(td, 'penguin') |
|
135 | f1 = os.path.join(td, 'penguin') | |
135 | with stdlib_io.open(f1, 'w') as f: |
|
136 | with stdlib_io.open(f1, 'w') as f: | |
136 | f.write(u'Before') |
|
137 | f.write(u'Before') | |
|
138 | ||||
|
139 | if os.name != 'nt': | |||
|
140 | os.chmod(f1, 0o701) | |||
|
141 | orig_mode = stat.S_IMODE(os.stat(f1).st_mode) | |||
|
142 | ||||
|
143 | f2 = os.path.join(td, 'flamingo') | |||
|
144 | try: | |||
|
145 | os.symlink(f1, f2) | |||
|
146 | have_symlink = True | |||
|
147 | except (AttributeError, NotImplementedError): | |||
|
148 | have_symlink = False | |||
137 |
|
149 | |||
138 | with nt.assert_raises(CustomExc): |
|
150 | with nt.assert_raises(CustomExc): | |
139 | with atomic_writing(f1) as f: |
|
151 | with atomic_writing(f1) as f: | |
@@ -149,3 +161,15 b' def test_atomic_writing():' | |||||
149 |
|
161 | |||
150 | with stdlib_io.open(f1, 'r') as f: |
|
162 | with stdlib_io.open(f1, 'r') as f: | |
151 | nt.assert_equal(f.read(), u'Overwritten') |
|
163 | nt.assert_equal(f.read(), u'Overwritten') | |
|
164 | ||||
|
165 | if os.name != 'nt': | |||
|
166 | mode = stat.S_IMODE(os.stat(f1).st_mode) | |||
|
167 | nt.assert_equal(mode, orig_mode) | |||
|
168 | ||||
|
169 | if have_symlink: | |||
|
170 | # Check that writing over a file preserves a symlink | |||
|
171 | with atomic_writing(f2) as f: | |||
|
172 | f.write(u'written from symlink') | |||
|
173 | ||||
|
174 | with stdlib_io.open(f1, 'r') as f: | |||
|
175 | nt.assert_equal(f.read(), u'written from symlink') No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now