diff --git a/IPython/utils/io.py b/IPython/utils/io.py index df1e39e..3d236eb 100644 --- a/IPython/utils/io.py +++ b/IPython/utils/io.py @@ -267,17 +267,18 @@ def atomic_writing(path, text=True, encoding='utf-8', **kwargs): path = os.path.join(os.path.dirname(path), os.readlink(path)) dirname, basename = os.path.split(path) - handle, tmp_path = tempfile.mkstemp(prefix=basename, dir=dirname) + tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname) + tmp_path = os.path.join(tmp_dir, basename) if text: - fileobj = io.open(handle, 'w', encoding=encoding, **kwargs) + fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs) else: - fileobj = io.open(handle, 'wb', **kwargs) + fileobj = io.open(tmp_path, 'wb', **kwargs) try: yield fileobj except: fileobj.close() - os.remove(tmp_path) + shutil.rmtree(tmp_dir) raise # Flush to disk @@ -299,6 +300,7 @@ def atomic_writing(path, text=True, encoding='utf-8', **kwargs): os.remove(path) os.rename(tmp_path, path) + shutil.rmtree(tmp_dir) def raw_print(*args, **kw): diff --git a/IPython/utils/tests/test_io.py b/IPython/utils/tests/test_io.py index 023c964..aa00a88 100644 --- a/IPython/utils/tests/test_io.py +++ b/IPython/utils/tests/test_io.py @@ -1,16 +1,9 @@ # encoding: utf-8 """Tests for io.py""" -#----------------------------------------------------------------------------- -# Copyright (C) 2008-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + from __future__ import print_function from __future__ import absolute_import @@ -24,7 +17,7 @@ import unittest import nose.tools as nt -from IPython.testing.decorators import skipif +from IPython.testing.decorators import skipif, skip_win32 from IPython.utils.io import (Tee, capture_output, unicode_std_stream, atomic_writing, ) @@ -36,10 +29,6 @@ if PY3: else: from StringIO import StringIO -#----------------------------------------------------------------------------- -# Tests -#----------------------------------------------------------------------------- - def test_tee_simple(): "Very simple check with stdout only" @@ -177,6 +166,33 @@ def test_atomic_writing(): with stdlib_io.open(f1, 'r') as f: nt.assert_equal(f.read(), u'written from symlink') +def _save_umask(): + global umask + umask = os.umask(0) + os.umask(umask) + +def _restore_umask(): + os.umask(umask) + +@skip_win32 +@nt.with_setup(_save_umask, _restore_umask) +def test_atomic_writing_umask(): + with TemporaryDirectory() as td: + os.umask(0o022) + f1 = os.path.join(td, '1') + with atomic_writing(f1) as f: + f.write(u'1') + mode = stat.S_IMODE(os.stat(f1).st_mode) + nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode)) + + os.umask(0o057) + f2 = os.path.join(td, '2') + with atomic_writing(f2) as f: + f.write(u'2') + mode = stat.S_IMODE(os.stat(f2).st_mode) + nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode)) + + def test_atomic_writing_newlines(): with TemporaryDirectory() as td: path = os.path.join(td, 'testfile')