Show More
@@ -267,17 +267,18 b" def atomic_writing(path, text=True, encoding='utf-8', **kwargs):" | |||
|
267 | 267 | path = os.path.join(os.path.dirname(path), os.readlink(path)) |
|
268 | 268 | |
|
269 | 269 | dirname, basename = os.path.split(path) |
|
270 |
|
|
|
270 | tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname) | |
|
271 | tmp_path = os.path.join(tmp_dir, basename) | |
|
271 | 272 | if text: |
|
272 |
fileobj = io.open( |
|
|
273 | fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs) | |
|
273 | 274 | else: |
|
274 |
fileobj = io.open( |
|
|
275 | fileobj = io.open(tmp_path, 'wb', **kwargs) | |
|
275 | 276 | |
|
276 | 277 | try: |
|
277 | 278 | yield fileobj |
|
278 | 279 | except: |
|
279 | 280 | fileobj.close() |
|
280 |
|
|
|
281 | shutil.rmtree(tmp_dir) | |
|
281 | 282 | raise |
|
282 | 283 | |
|
283 | 284 | # Flush to disk |
@@ -299,6 +300,7 b" def atomic_writing(path, text=True, encoding='utf-8', **kwargs):" | |||
|
299 | 300 | os.remove(path) |
|
300 | 301 | |
|
301 | 302 | os.rename(tmp_path, path) |
|
303 | shutil.rmtree(tmp_dir) | |
|
302 | 304 | |
|
303 | 305 | |
|
304 | 306 | def raw_print(*args, **kw): |
@@ -1,16 +1,9 b'' | |||
|
1 | 1 | # encoding: utf-8 |
|
2 | 2 | """Tests for io.py""" |
|
3 | 3 | |
|
4 | #----------------------------------------------------------------------------- | |
|
5 | # Copyright (C) 2008-2011 The IPython Development Team | |
|
6 | # | |
|
7 | # Distributed under the terms of the BSD License. The full license is in | |
|
8 | # the file COPYING, distributed as part of this software. | |
|
9 | #----------------------------------------------------------------------------- | |
|
10 | ||
|
11 | #----------------------------------------------------------------------------- | |
|
12 | # Imports | |
|
13 | #----------------------------------------------------------------------------- | |
|
4 | # Copyright (c) IPython Development Team. | |
|
5 | # Distributed under the terms of the Modified BSD License. | |
|
6 | ||
|
14 | 7 | from __future__ import print_function |
|
15 | 8 | from __future__ import absolute_import |
|
16 | 9 | |
@@ -24,7 +17,7 b' import unittest' | |||
|
24 | 17 | |
|
25 | 18 | import nose.tools as nt |
|
26 | 19 | |
|
27 | from IPython.testing.decorators import skipif | |
|
20 | from IPython.testing.decorators import skipif, skip_win32 | |
|
28 | 21 | from IPython.utils.io import (Tee, capture_output, unicode_std_stream, |
|
29 | 22 | atomic_writing, |
|
30 | 23 | ) |
@@ -36,10 +29,6 b' if PY3:' | |||
|
36 | 29 | else: |
|
37 | 30 | from StringIO import StringIO |
|
38 | 31 | |
|
39 | #----------------------------------------------------------------------------- | |
|
40 | # Tests | |
|
41 | #----------------------------------------------------------------------------- | |
|
42 | ||
|
43 | 32 | |
|
44 | 33 | def test_tee_simple(): |
|
45 | 34 | "Very simple check with stdout only" |
@@ -177,6 +166,33 b' def test_atomic_writing():' | |||
|
177 | 166 | with stdlib_io.open(f1, 'r') as f: |
|
178 | 167 | nt.assert_equal(f.read(), u'written from symlink') |
|
179 | 168 | |
|
169 | def _save_umask(): | |
|
170 | global umask | |
|
171 | umask = os.umask(0) | |
|
172 | os.umask(umask) | |
|
173 | ||
|
174 | def _restore_umask(): | |
|
175 | os.umask(umask) | |
|
176 | ||
|
177 | @skip_win32 | |
|
178 | @nt.with_setup(_save_umask, _restore_umask) | |
|
179 | def test_atomic_writing_umask(): | |
|
180 | with TemporaryDirectory() as td: | |
|
181 | os.umask(0o022) | |
|
182 | f1 = os.path.join(td, '1') | |
|
183 | with atomic_writing(f1) as f: | |
|
184 | f.write(u'1') | |
|
185 | mode = stat.S_IMODE(os.stat(f1).st_mode) | |
|
186 | nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode)) | |
|
187 | ||
|
188 | os.umask(0o057) | |
|
189 | f2 = os.path.join(td, '2') | |
|
190 | with atomic_writing(f2) as f: | |
|
191 | f.write(u'2') | |
|
192 | mode = stat.S_IMODE(os.stat(f2).st_mode) | |
|
193 | nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode)) | |
|
194 | ||
|
195 | ||
|
180 | 196 | def test_atomic_writing_newlines(): |
|
181 | 197 | with TemporaryDirectory() as td: |
|
182 | 198 | path = os.path.join(td, 'testfile') |
General Comments 0
You need to be logged in to leave comments.
Login now