##// END OF EJS Templates
Merge pull request #7469 from minrk/atomic-umask...
Matthias Bussonnier -
r19948:7b164474 merge
parent child Browse files
Show More
@@ -267,17 +267,18 b" def atomic_writing(path, text=True, encoding='utf-8', **kwargs):"
267 path = os.path.join(os.path.dirname(path), os.readlink(path))
267 path = os.path.join(os.path.dirname(path), os.readlink(path))
268
268
269 dirname, basename = os.path.split(path)
269 dirname, basename = os.path.split(path)
270 handle, tmp_path = tempfile.mkstemp(prefix=basename, dir=dirname)
270 tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
271 tmp_path = os.path.join(tmp_dir, basename)
271 if text:
272 if text:
272 fileobj = io.open(handle, 'w', encoding=encoding, **kwargs)
273 fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
273 else:
274 else:
274 fileobj = io.open(handle, 'wb', **kwargs)
275 fileobj = io.open(tmp_path, 'wb', **kwargs)
275
276
276 try:
277 try:
277 yield fileobj
278 yield fileobj
278 except:
279 except:
279 fileobj.close()
280 fileobj.close()
280 os.remove(tmp_path)
281 shutil.rmtree(tmp_dir)
281 raise
282 raise
282
283
283 # Flush to disk
284 # Flush to disk
@@ -299,6 +300,7 b" def atomic_writing(path, text=True, encoding='utf-8', **kwargs):"
299 os.remove(path)
300 os.remove(path)
300
301
301 os.rename(tmp_path, path)
302 os.rename(tmp_path, path)
303 shutil.rmtree(tmp_dir)
302
304
303
305
304 def raw_print(*args, **kw):
306 def raw_print(*args, **kw):
@@ -1,16 +1,9 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """Tests for io.py"""
2 """Tests for io.py"""
3
3
4 #-----------------------------------------------------------------------------
4 # Copyright (c) IPython Development Team.
5 # Copyright (C) 2008-2011 The IPython Development Team
5 # Distributed under the terms of the Modified BSD License.
6 #
6
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14 from __future__ import print_function
7 from __future__ import print_function
15 from __future__ import absolute_import
8 from __future__ import absolute_import
16
9
@@ -24,7 +17,7 b' import unittest'
24
17
25 import nose.tools as nt
18 import nose.tools as nt
26
19
27 from IPython.testing.decorators import skipif
20 from IPython.testing.decorators import skipif, skip_win32
28 from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
21 from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
29 atomic_writing,
22 atomic_writing,
30 )
23 )
@@ -36,10 +29,6 b' if PY3:'
36 else:
29 else:
37 from StringIO import StringIO
30 from StringIO import StringIO
38
31
39 #-----------------------------------------------------------------------------
40 # Tests
41 #-----------------------------------------------------------------------------
42
43
32
44 def test_tee_simple():
33 def test_tee_simple():
45 "Very simple check with stdout only"
34 "Very simple check with stdout only"
@@ -177,6 +166,33 b' def test_atomic_writing():'
177 with stdlib_io.open(f1, 'r') as f:
166 with stdlib_io.open(f1, 'r') as f:
178 nt.assert_equal(f.read(), u'written from symlink')
167 nt.assert_equal(f.read(), u'written from symlink')
179
168
169 def _save_umask():
170 global umask
171 umask = os.umask(0)
172 os.umask(umask)
173
174 def _restore_umask():
175 os.umask(umask)
176
177 @skip_win32
178 @nt.with_setup(_save_umask, _restore_umask)
179 def test_atomic_writing_umask():
180 with TemporaryDirectory() as td:
181 os.umask(0o022)
182 f1 = os.path.join(td, '1')
183 with atomic_writing(f1) as f:
184 f.write(u'1')
185 mode = stat.S_IMODE(os.stat(f1).st_mode)
186 nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
187
188 os.umask(0o057)
189 f2 = os.path.join(td, '2')
190 with atomic_writing(f2) as f:
191 f.write(u'2')
192 mode = stat.S_IMODE(os.stat(f2).st_mode)
193 nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
194
195
180 def test_atomic_writing_newlines():
196 def test_atomic_writing_newlines():
181 with TemporaryDirectory() as td:
197 with TemporaryDirectory() as td:
182 path = os.path.join(td, 'testfile')
198 path = os.path.join(td, 'testfile')
General Comments 0
You need to be logged in to leave comments. Login now