diff --git a/IPython/html/services/contents/fileio.py b/IPython/html/services/contents/fileio.py
index 0f7ff5c..7fa09c2 100644
--- a/IPython/html/services/contents/fileio.py
+++ b/IPython/html/services/contents/fileio.py
@@ -11,6 +11,7 @@ import errno
import io
import os
import shutil
+import tempfile
from tornado.web import HTTPError
@@ -19,10 +20,91 @@ from IPython.html.utils import (
to_os_path,
)
from IPython import nbformat
-from IPython.utils.io import atomic_writing
from IPython.utils.py3compat import str_to_unicode
+def _copy_metadata(src, dst):
+ """Copy the set of metadata we want for atomic_writing.
+
+ Permission bits and flags. We'd like to copy file ownership as well, but we
+ can't do that.
+ """
+ shutil.copymode(src, dst)
+ st = os.stat(src)
+ if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
+ os.chflags(dst, st.st_flags)
+
+@contextmanager
+def atomic_writing(path, text=True, encoding='utf-8', **kwargs):
+ """Context manager to write to a file only if the entire write is successful.
+
+ This works by creating a temporary file in the same directory, and renaming
+ it over the old file if the context is exited without an error. If other
+ file names are hard linked to the target file, this relationship will not be
+ preserved.
+
+ On Windows, there is a small chink in the atomicity: the target file is
+ deleted before renaming the temporary file over it. This appears to be
+ unavoidable.
+
+ Parameters
+ ----------
+ path : str
+ The target file to write to.
+
+ text : bool, optional
+ Whether to open the file in text mode (i.e. to write unicode). Default is
+ True.
+
+ encoding : str, optional
+ The encoding to use for files opened in text mode. Default is UTF-8.
+
+ **kwargs
+ Passed to :func:`io.open`.
+ """
+ # realpath doesn't work on Windows: http://bugs.python.org/issue9949
+ # Luckily, we only need to resolve the file itself being a symlink, not
+ # any of its directories, so this will suffice:
+ if os.path.islink(path):
+ path = os.path.join(os.path.dirname(path), os.readlink(path))
+
+ dirname, basename = os.path.split(path)
+ tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
+ tmp_path = os.path.join(tmp_dir, basename)
+ if text:
+ fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
+ else:
+ fileobj = io.open(tmp_path, 'wb', **kwargs)
+
+ try:
+ yield fileobj
+ except:
+ fileobj.close()
+ shutil.rmtree(tmp_dir)
+ raise
+
+ # Flush to disk
+ fileobj.flush()
+ os.fsync(fileobj.fileno())
+
+ # Written successfully, now rename it
+ fileobj.close()
+
+ # Copy permission bits, access time, etc.
+ try:
+ _copy_metadata(path, tmp_path)
+ except OSError:
+ # e.g. the file didn't already exist. Ignore any failure to copy metadata
+ pass
+
+ if os.name == 'nt' and os.path.exists(path):
+ # Rename over existing file doesn't work on Windows
+ os.remove(path)
+
+ os.rename(tmp_path, path)
+ shutil.rmtree(tmp_dir)
+
+
class FileManagerMixin(object):
"""
Mixin for ContentsAPI classes that interact with the filesystem.
diff --git a/IPython/html/services/contents/tests/test_fileio.py b/IPython/html/services/contents/tests/test_fileio.py
new file mode 100644
index 0000000..b517280
--- /dev/null
+++ b/IPython/html/services/contents/tests/test_fileio.py
@@ -0,0 +1,131 @@
+# encoding: utf-8
+"""Tests for file IO"""
+
+# Copyright (c) IPython Development Team.
+# Distributed under the terms of the Modified BSD License.
+
+import io as stdlib_io
+import os.path
+import stat
+
+import nose.tools as nt
+
+from IPython.testing.decorators import skip_win32
+from ..fileio import atomic_writing
+
+from IPython.utils.tempdir import TemporaryDirectory
+
+umask = 0
+
+def test_atomic_writing():
+ class CustomExc(Exception): pass
+
+ with TemporaryDirectory() as td:
+ f1 = os.path.join(td, 'penguin')
+ with stdlib_io.open(f1, 'w') as f:
+ f.write(u'Before')
+
+ if os.name != 'nt':
+ os.chmod(f1, 0o701)
+ orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
+
+ f2 = os.path.join(td, 'flamingo')
+ try:
+ os.symlink(f1, f2)
+ have_symlink = True
+ except (AttributeError, NotImplementedError, OSError):
+ # AttributeError: Python doesn't support it
+ # NotImplementedError: The system doesn't support it
+ # OSError: The user lacks the privilege (Windows)
+ have_symlink = False
+
+ with nt.assert_raises(CustomExc):
+ with atomic_writing(f1) as f:
+ f.write(u'Failing write')
+ raise CustomExc
+
+ # Because of the exception, the file should not have been modified
+ with stdlib_io.open(f1, 'r') as f:
+ nt.assert_equal(f.read(), u'Before')
+
+ with atomic_writing(f1) as f:
+ f.write(u'Overwritten')
+
+ with stdlib_io.open(f1, 'r') as f:
+ nt.assert_equal(f.read(), u'Overwritten')
+
+ if os.name != 'nt':
+ mode = stat.S_IMODE(os.stat(f1).st_mode)
+ nt.assert_equal(mode, orig_mode)
+
+ if have_symlink:
+ # Check that writing over a file preserves a symlink
+ with atomic_writing(f2) as f:
+ f.write(u'written from symlink')
+
+ with stdlib_io.open(f1, 'r') as f:
+ nt.assert_equal(f.read(), u'written from symlink')
+
+def _save_umask():
+ global umask
+ umask = os.umask(0)
+ os.umask(umask)
+
+def _restore_umask():
+ os.umask(umask)
+
+@skip_win32
+@nt.with_setup(_save_umask, _restore_umask)
+def test_atomic_writing_umask():
+ with TemporaryDirectory() as td:
+ os.umask(0o022)
+ f1 = os.path.join(td, '1')
+ with atomic_writing(f1) as f:
+ f.write(u'1')
+ mode = stat.S_IMODE(os.stat(f1).st_mode)
+ nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
+
+ os.umask(0o057)
+ f2 = os.path.join(td, '2')
+ with atomic_writing(f2) as f:
+ f.write(u'2')
+ mode = stat.S_IMODE(os.stat(f2).st_mode)
+ nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
+
+
+def test_atomic_writing_newlines():
+ with TemporaryDirectory() as td:
+ path = os.path.join(td, 'testfile')
+
+ lf = u'a\nb\nc\n'
+ plat = lf.replace(u'\n', os.linesep)
+ crlf = lf.replace(u'\n', u'\r\n')
+
+ # test default
+ with stdlib_io.open(path, 'w') as f:
+ f.write(lf)
+ with stdlib_io.open(path, 'r', newline='') as f:
+ read = f.read()
+ nt.assert_equal(read, plat)
+
+ # test newline=LF
+ with stdlib_io.open(path, 'w', newline='\n') as f:
+ f.write(lf)
+ with stdlib_io.open(path, 'r', newline='') as f:
+ read = f.read()
+ nt.assert_equal(read, lf)
+
+ # test newline=CRLF
+ with atomic_writing(path, newline='\r\n') as f:
+ f.write(lf)
+ with stdlib_io.open(path, 'r', newline='') as f:
+ read = f.read()
+ nt.assert_equal(read, crlf)
+
+ # test newline=no convert
+ text = u'crlf\r\ncr\rlf\n'
+ with atomic_writing(path, newline='') as f:
+ f.write(text)
+ with stdlib_io.open(path, 'r', newline='') as f:
+ read = f.read()
+ nt.assert_equal(read, text)
diff --git a/IPython/utils/io.py b/IPython/utils/io.py
index 8d15f11..6bed42e 100644
--- a/IPython/utils/io.py
+++ b/IPython/utils/io.py
@@ -212,87 +212,11 @@ def temp_pyfile(src, ext='.py'):
f.flush()
return fname, f
-def _copy_metadata(src, dst):
- """Copy the set of metadata we want for atomic_writing.
-
- Permission bits and flags. We'd like to copy file ownership as well, but we
- can't do that.
- """
- shutil.copymode(src, dst)
- st = os.stat(src)
- if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
- os.chflags(dst, st.st_flags)
-
-@contextmanager
-def atomic_writing(path, text=True, encoding='utf-8', **kwargs):
- """Context manager to write to a file only if the entire write is successful.
-
- This works by creating a temporary file in the same directory, and renaming
- it over the old file if the context is exited without an error. If other
- file names are hard linked to the target file, this relationship will not be
- preserved.
-
- On Windows, there is a small chink in the atomicity: the target file is
- deleted before renaming the temporary file over it. This appears to be
- unavoidable.
-
- Parameters
- ----------
- path : str
- The target file to write to.
-
- text : bool, optional
- Whether to open the file in text mode (i.e. to write unicode). Default is
- True.
-
- encoding : str, optional
- The encoding to use for files opened in text mode. Default is UTF-8.
-
- **kwargs
- Passed to :func:`io.open`.
- """
- # realpath doesn't work on Windows: http://bugs.python.org/issue9949
- # Luckily, we only need to resolve the file itself being a symlink, not
- # any of its directories, so this will suffice:
- if os.path.islink(path):
- path = os.path.join(os.path.dirname(path), os.readlink(path))
-
- dirname, basename = os.path.split(path)
- tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
- tmp_path = os.path.join(tmp_dir, basename)
- if text:
- fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
- else:
- fileobj = io.open(tmp_path, 'wb', **kwargs)
-
- try:
- yield fileobj
- except:
- fileobj.close()
- shutil.rmtree(tmp_dir)
- raise
-
- # Flush to disk
- fileobj.flush()
- os.fsync(fileobj.fileno())
-
- # Written successfully, now rename it
- fileobj.close()
-
- # Copy permission bits, access time, etc.
- try:
- _copy_metadata(path, tmp_path)
- except OSError:
- # e.g. the file didn't already exist. Ignore any failure to copy metadata
- pass
-
- if os.name == 'nt' and os.path.exists(path):
- # Rename over existing file doesn't work on Windows
- os.remove(path)
-
- os.rename(tmp_path, path)
- shutil.rmtree(tmp_dir)
-
+def atomic_writing(*args, **kwargs):
+ """DEPRECATED: moved to IPython.html.services.contents.fileio"""
+ warn("IPython.utils.io.atomic_writing has moved to IPython.html.services.contents.fileio")
+ from IPython.html.services.contents.fileio import atomic_writing
+ return atomic_writing(*args, **kwargs)
def raw_print(*args, **kw):
"""Raw print to sys.__stdout__, otherwise identical interface to print()."""
diff --git a/IPython/utils/tests/test_io.py b/IPython/utils/tests/test_io.py
index 5e75d7d..04c4e9e 100644
--- a/IPython/utils/tests/test_io.py
+++ b/IPython/utils/tests/test_io.py
@@ -18,9 +18,7 @@ import unittest
import nose.tools as nt
from IPython.testing.decorators import skipif, skip_win32
-from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
- atomic_writing,
- )
+from IPython.utils.io import Tee, capture_output
from IPython.utils.py3compat import doctest_refactor_print, PY3
from IPython.utils.tempdir import TemporaryDirectory
@@ -87,115 +85,3 @@ def test_capture_output():
nt.assert_equal(io.stderr, 'hi, stderr\n')
-def test_atomic_writing():
- class CustomExc(Exception): pass
-
- with TemporaryDirectory() as td:
- f1 = os.path.join(td, 'penguin')
- with stdlib_io.open(f1, 'w') as f:
- f.write(u'Before')
-
- if os.name != 'nt':
- os.chmod(f1, 0o701)
- orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
-
- f2 = os.path.join(td, 'flamingo')
- try:
- os.symlink(f1, f2)
- have_symlink = True
- except (AttributeError, NotImplementedError, OSError):
- # AttributeError: Python doesn't support it
- # NotImplementedError: The system doesn't support it
- # OSError: The user lacks the privilege (Windows)
- have_symlink = False
-
- with nt.assert_raises(CustomExc):
- with atomic_writing(f1) as f:
- f.write(u'Failing write')
- raise CustomExc
-
- # Because of the exception, the file should not have been modified
- with stdlib_io.open(f1, 'r') as f:
- nt.assert_equal(f.read(), u'Before')
-
- with atomic_writing(f1) as f:
- f.write(u'Overwritten')
-
- with stdlib_io.open(f1, 'r') as f:
- nt.assert_equal(f.read(), u'Overwritten')
-
- if os.name != 'nt':
- mode = stat.S_IMODE(os.stat(f1).st_mode)
- nt.assert_equal(mode, orig_mode)
-
- if have_symlink:
- # Check that writing over a file preserves a symlink
- with atomic_writing(f2) as f:
- f.write(u'written from symlink')
-
- with stdlib_io.open(f1, 'r') as f:
- nt.assert_equal(f.read(), u'written from symlink')
-
-def _save_umask():
- global umask
- umask = os.umask(0)
- os.umask(umask)
-
-def _restore_umask():
- os.umask(umask)
-
-@skip_win32
-@nt.with_setup(_save_umask, _restore_umask)
-def test_atomic_writing_umask():
- with TemporaryDirectory() as td:
- os.umask(0o022)
- f1 = os.path.join(td, '1')
- with atomic_writing(f1) as f:
- f.write(u'1')
- mode = stat.S_IMODE(os.stat(f1).st_mode)
- nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
-
- os.umask(0o057)
- f2 = os.path.join(td, '2')
- with atomic_writing(f2) as f:
- f.write(u'2')
- mode = stat.S_IMODE(os.stat(f2).st_mode)
- nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
-
-
-def test_atomic_writing_newlines():
- with TemporaryDirectory() as td:
- path = os.path.join(td, 'testfile')
-
- lf = u'a\nb\nc\n'
- plat = lf.replace(u'\n', os.linesep)
- crlf = lf.replace(u'\n', u'\r\n')
-
- # test default
- with stdlib_io.open(path, 'w') as f:
- f.write(lf)
- with stdlib_io.open(path, 'r', newline='') as f:
- read = f.read()
- nt.assert_equal(read, plat)
-
- # test newline=LF
- with stdlib_io.open(path, 'w', newline='\n') as f:
- f.write(lf)
- with stdlib_io.open(path, 'r', newline='') as f:
- read = f.read()
- nt.assert_equal(read, lf)
-
- # test newline=CRLF
- with atomic_writing(path, newline='\r\n') as f:
- f.write(lf)
- with stdlib_io.open(path, 'r', newline='') as f:
- read = f.read()
- nt.assert_equal(read, crlf)
-
- # test newline=no convert
- text = u'crlf\r\ncr\rlf\n'
- with atomic_writing(path, newline='') as f:
- f.write(text)
- with stdlib_io.open(path, 'r', newline='') as f:
- read = f.read()
- nt.assert_equal(read, text)