##// END OF EJS Templates
move utils.io.atomic_writing to contents.fileio
Min RK -
Show More
@@ -0,0 +1,131 b''
1 # encoding: utf-8
2 """Tests for file IO"""
3
4 # Copyright (c) IPython Development Team.
5 # Distributed under the terms of the Modified BSD License.
6
7 import io as stdlib_io
8 import os.path
9 import stat
10
11 import nose.tools as nt
12
13 from IPython.testing.decorators import skip_win32
14 from ..fileio import atomic_writing
15
16 from IPython.utils.tempdir import TemporaryDirectory
17
18 umask = 0
19
20 def test_atomic_writing():
21 class CustomExc(Exception): pass
22
23 with TemporaryDirectory() as td:
24 f1 = os.path.join(td, 'penguin')
25 with stdlib_io.open(f1, 'w') as f:
26 f.write(u'Before')
27
28 if os.name != 'nt':
29 os.chmod(f1, 0o701)
30 orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
31
32 f2 = os.path.join(td, 'flamingo')
33 try:
34 os.symlink(f1, f2)
35 have_symlink = True
36 except (AttributeError, NotImplementedError, OSError):
37 # AttributeError: Python doesn't support it
38 # NotImplementedError: The system doesn't support it
39 # OSError: The user lacks the privilege (Windows)
40 have_symlink = False
41
42 with nt.assert_raises(CustomExc):
43 with atomic_writing(f1) as f:
44 f.write(u'Failing write')
45 raise CustomExc
46
47 # Because of the exception, the file should not have been modified
48 with stdlib_io.open(f1, 'r') as f:
49 nt.assert_equal(f.read(), u'Before')
50
51 with atomic_writing(f1) as f:
52 f.write(u'Overwritten')
53
54 with stdlib_io.open(f1, 'r') as f:
55 nt.assert_equal(f.read(), u'Overwritten')
56
57 if os.name != 'nt':
58 mode = stat.S_IMODE(os.stat(f1).st_mode)
59 nt.assert_equal(mode, orig_mode)
60
61 if have_symlink:
62 # Check that writing over a file preserves a symlink
63 with atomic_writing(f2) as f:
64 f.write(u'written from symlink')
65
66 with stdlib_io.open(f1, 'r') as f:
67 nt.assert_equal(f.read(), u'written from symlink')
68
69 def _save_umask():
70 global umask
71 umask = os.umask(0)
72 os.umask(umask)
73
74 def _restore_umask():
75 os.umask(umask)
76
77 @skip_win32
78 @nt.with_setup(_save_umask, _restore_umask)
79 def test_atomic_writing_umask():
80 with TemporaryDirectory() as td:
81 os.umask(0o022)
82 f1 = os.path.join(td, '1')
83 with atomic_writing(f1) as f:
84 f.write(u'1')
85 mode = stat.S_IMODE(os.stat(f1).st_mode)
86 nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
87
88 os.umask(0o057)
89 f2 = os.path.join(td, '2')
90 with atomic_writing(f2) as f:
91 f.write(u'2')
92 mode = stat.S_IMODE(os.stat(f2).st_mode)
93 nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
94
95
96 def test_atomic_writing_newlines():
97 with TemporaryDirectory() as td:
98 path = os.path.join(td, 'testfile')
99
100 lf = u'a\nb\nc\n'
101 plat = lf.replace(u'\n', os.linesep)
102 crlf = lf.replace(u'\n', u'\r\n')
103
104 # test default
105 with stdlib_io.open(path, 'w') as f:
106 f.write(lf)
107 with stdlib_io.open(path, 'r', newline='') as f:
108 read = f.read()
109 nt.assert_equal(read, plat)
110
111 # test newline=LF
112 with stdlib_io.open(path, 'w', newline='\n') as f:
113 f.write(lf)
114 with stdlib_io.open(path, 'r', newline='') as f:
115 read = f.read()
116 nt.assert_equal(read, lf)
117
118 # test newline=CRLF
119 with atomic_writing(path, newline='\r\n') as f:
120 f.write(lf)
121 with stdlib_io.open(path, 'r', newline='') as f:
122 read = f.read()
123 nt.assert_equal(read, crlf)
124
125 # test newline=no convert
126 text = u'crlf\r\ncr\rlf\n'
127 with atomic_writing(path, newline='') as f:
128 f.write(text)
129 with stdlib_io.open(path, 'r', newline='') as f:
130 read = f.read()
131 nt.assert_equal(read, text)
@@ -11,6 +11,7 b' import errno'
11 import io
11 import io
12 import os
12 import os
13 import shutil
13 import shutil
14 import tempfile
14
15
15 from tornado.web import HTTPError
16 from tornado.web import HTTPError
16
17
@@ -19,10 +20,91 b' from IPython.html.utils import ('
19 to_os_path,
20 to_os_path,
20 )
21 )
21 from IPython import nbformat
22 from IPython import nbformat
22 from IPython.utils.io import atomic_writing
23 from IPython.utils.py3compat import str_to_unicode
23 from IPython.utils.py3compat import str_to_unicode
24
24
25
25
26 def _copy_metadata(src, dst):
27 """Copy the set of metadata we want for atomic_writing.
28
29 Permission bits and flags. We'd like to copy file ownership as well, but we
30 can't do that.
31 """
32 shutil.copymode(src, dst)
33 st = os.stat(src)
34 if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
35 os.chflags(dst, st.st_flags)
36
37 @contextmanager
38 def atomic_writing(path, text=True, encoding='utf-8', **kwargs):
39 """Context manager to write to a file only if the entire write is successful.
40
41 This works by creating a temporary file in the same directory, and renaming
42 it over the old file if the context is exited without an error. If other
43 file names are hard linked to the target file, this relationship will not be
44 preserved.
45
46 On Windows, there is a small chink in the atomicity: the target file is
47 deleted before renaming the temporary file over it. This appears to be
48 unavoidable.
49
50 Parameters
51 ----------
52 path : str
53 The target file to write to.
54
55 text : bool, optional
56 Whether to open the file in text mode (i.e. to write unicode). Default is
57 True.
58
59 encoding : str, optional
60 The encoding to use for files opened in text mode. Default is UTF-8.
61
62 **kwargs
63 Passed to :func:`io.open`.
64 """
65 # realpath doesn't work on Windows: http://bugs.python.org/issue9949
66 # Luckily, we only need to resolve the file itself being a symlink, not
67 # any of its directories, so this will suffice:
68 if os.path.islink(path):
69 path = os.path.join(os.path.dirname(path), os.readlink(path))
70
71 dirname, basename = os.path.split(path)
72 tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
73 tmp_path = os.path.join(tmp_dir, basename)
74 if text:
75 fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
76 else:
77 fileobj = io.open(tmp_path, 'wb', **kwargs)
78
79 try:
80 yield fileobj
81 except:
82 fileobj.close()
83 shutil.rmtree(tmp_dir)
84 raise
85
86 # Flush to disk
87 fileobj.flush()
88 os.fsync(fileobj.fileno())
89
90 # Written successfully, now rename it
91 fileobj.close()
92
93 # Copy permission bits, access time, etc.
94 try:
95 _copy_metadata(path, tmp_path)
96 except OSError:
97 # e.g. the file didn't already exist. Ignore any failure to copy metadata
98 pass
99
100 if os.name == 'nt' and os.path.exists(path):
101 # Rename over existing file doesn't work on Windows
102 os.remove(path)
103
104 os.rename(tmp_path, path)
105 shutil.rmtree(tmp_dir)
106
107
26 class FileManagerMixin(object):
108 class FileManagerMixin(object):
27 """
109 """
28 Mixin for ContentsAPI classes that interact with the filesystem.
110 Mixin for ContentsAPI classes that interact with the filesystem.
@@ -212,87 +212,11 b" def temp_pyfile(src, ext='.py'):"
212 f.flush()
212 f.flush()
213 return fname, f
213 return fname, f
214
214
215 def _copy_metadata(src, dst):
215 def atomic_writing(*args, **kwargs):
216 """Copy the set of metadata we want for atomic_writing.
216 """DEPRECATED: moved to IPython.html.services.contents.fileio"""
217
217 warn("IPython.utils.io.atomic_writing has moved to IPython.html.services.contents.fileio")
218 Permission bits and flags. We'd like to copy file ownership as well, but we
218 from IPython.html.services.contents.fileio import atomic_writing
219 can't do that.
219 return atomic_writing(*args, **kwargs)
220 """
221 shutil.copymode(src, dst)
222 st = os.stat(src)
223 if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
224 os.chflags(dst, st.st_flags)
225
226 @contextmanager
227 def atomic_writing(path, text=True, encoding='utf-8', **kwargs):
228 """Context manager to write to a file only if the entire write is successful.
229
230 This works by creating a temporary file in the same directory, and renaming
231 it over the old file if the context is exited without an error. If other
232 file names are hard linked to the target file, this relationship will not be
233 preserved.
234
235 On Windows, there is a small chink in the atomicity: the target file is
236 deleted before renaming the temporary file over it. This appears to be
237 unavoidable.
238
239 Parameters
240 ----------
241 path : str
242 The target file to write to.
243
244 text : bool, optional
245 Whether to open the file in text mode (i.e. to write unicode). Default is
246 True.
247
248 encoding : str, optional
249 The encoding to use for files opened in text mode. Default is UTF-8.
250
251 **kwargs
252 Passed to :func:`io.open`.
253 """
254 # realpath doesn't work on Windows: http://bugs.python.org/issue9949
255 # Luckily, we only need to resolve the file itself being a symlink, not
256 # any of its directories, so this will suffice:
257 if os.path.islink(path):
258 path = os.path.join(os.path.dirname(path), os.readlink(path))
259
260 dirname, basename = os.path.split(path)
261 tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
262 tmp_path = os.path.join(tmp_dir, basename)
263 if text:
264 fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
265 else:
266 fileobj = io.open(tmp_path, 'wb', **kwargs)
267
268 try:
269 yield fileobj
270 except:
271 fileobj.close()
272 shutil.rmtree(tmp_dir)
273 raise
274
275 # Flush to disk
276 fileobj.flush()
277 os.fsync(fileobj.fileno())
278
279 # Written successfully, now rename it
280 fileobj.close()
281
282 # Copy permission bits, access time, etc.
283 try:
284 _copy_metadata(path, tmp_path)
285 except OSError:
286 # e.g. the file didn't already exist. Ignore any failure to copy metadata
287 pass
288
289 if os.name == 'nt' and os.path.exists(path):
290 # Rename over existing file doesn't work on Windows
291 os.remove(path)
292
293 os.rename(tmp_path, path)
294 shutil.rmtree(tmp_dir)
295
296
220
297 def raw_print(*args, **kw):
221 def raw_print(*args, **kw):
298 """Raw print to sys.__stdout__, otherwise identical interface to print()."""
222 """Raw print to sys.__stdout__, otherwise identical interface to print()."""
@@ -18,9 +18,7 b' import unittest'
18 import nose.tools as nt
18 import nose.tools as nt
19
19
20 from IPython.testing.decorators import skipif, skip_win32
20 from IPython.testing.decorators import skipif, skip_win32
21 from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
21 from IPython.utils.io import Tee, capture_output
22 atomic_writing,
23 )
24 from IPython.utils.py3compat import doctest_refactor_print, PY3
22 from IPython.utils.py3compat import doctest_refactor_print, PY3
25 from IPython.utils.tempdir import TemporaryDirectory
23 from IPython.utils.tempdir import TemporaryDirectory
26
24
@@ -87,115 +85,3 b' def test_capture_output():'
87 nt.assert_equal(io.stderr, 'hi, stderr\n')
85 nt.assert_equal(io.stderr, 'hi, stderr\n')
88
86
89
87
90 def test_atomic_writing():
91 class CustomExc(Exception): pass
92
93 with TemporaryDirectory() as td:
94 f1 = os.path.join(td, 'penguin')
95 with stdlib_io.open(f1, 'w') as f:
96 f.write(u'Before')
97
98 if os.name != 'nt':
99 os.chmod(f1, 0o701)
100 orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
101
102 f2 = os.path.join(td, 'flamingo')
103 try:
104 os.symlink(f1, f2)
105 have_symlink = True
106 except (AttributeError, NotImplementedError, OSError):
107 # AttributeError: Python doesn't support it
108 # NotImplementedError: The system doesn't support it
109 # OSError: The user lacks the privilege (Windows)
110 have_symlink = False
111
112 with nt.assert_raises(CustomExc):
113 with atomic_writing(f1) as f:
114 f.write(u'Failing write')
115 raise CustomExc
116
117 # Because of the exception, the file should not have been modified
118 with stdlib_io.open(f1, 'r') as f:
119 nt.assert_equal(f.read(), u'Before')
120
121 with atomic_writing(f1) as f:
122 f.write(u'Overwritten')
123
124 with stdlib_io.open(f1, 'r') as f:
125 nt.assert_equal(f.read(), u'Overwritten')
126
127 if os.name != 'nt':
128 mode = stat.S_IMODE(os.stat(f1).st_mode)
129 nt.assert_equal(mode, orig_mode)
130
131 if have_symlink:
132 # Check that writing over a file preserves a symlink
133 with atomic_writing(f2) as f:
134 f.write(u'written from symlink')
135
136 with stdlib_io.open(f1, 'r') as f:
137 nt.assert_equal(f.read(), u'written from symlink')
138
139 def _save_umask():
140 global umask
141 umask = os.umask(0)
142 os.umask(umask)
143
144 def _restore_umask():
145 os.umask(umask)
146
147 @skip_win32
148 @nt.with_setup(_save_umask, _restore_umask)
149 def test_atomic_writing_umask():
150 with TemporaryDirectory() as td:
151 os.umask(0o022)
152 f1 = os.path.join(td, '1')
153 with atomic_writing(f1) as f:
154 f.write(u'1')
155 mode = stat.S_IMODE(os.stat(f1).st_mode)
156 nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
157
158 os.umask(0o057)
159 f2 = os.path.join(td, '2')
160 with atomic_writing(f2) as f:
161 f.write(u'2')
162 mode = stat.S_IMODE(os.stat(f2).st_mode)
163 nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
164
165
166 def test_atomic_writing_newlines():
167 with TemporaryDirectory() as td:
168 path = os.path.join(td, 'testfile')
169
170 lf = u'a\nb\nc\n'
171 plat = lf.replace(u'\n', os.linesep)
172 crlf = lf.replace(u'\n', u'\r\n')
173
174 # test default
175 with stdlib_io.open(path, 'w') as f:
176 f.write(lf)
177 with stdlib_io.open(path, 'r', newline='') as f:
178 read = f.read()
179 nt.assert_equal(read, plat)
180
181 # test newline=LF
182 with stdlib_io.open(path, 'w', newline='\n') as f:
183 f.write(lf)
184 with stdlib_io.open(path, 'r', newline='') as f:
185 read = f.read()
186 nt.assert_equal(read, lf)
187
188 # test newline=CRLF
189 with atomic_writing(path, newline='\r\n') as f:
190 f.write(lf)
191 with stdlib_io.open(path, 'r', newline='') as f:
192 read = f.read()
193 nt.assert_equal(read, crlf)
194
195 # test newline=no convert
196 text = u'crlf\r\ncr\rlf\n'
197 with atomic_writing(path, newline='') as f:
198 f.write(text)
199 with stdlib_io.open(path, 'r', newline='') as f:
200 read = f.read()
201 nt.assert_equal(read, text)
General Comments 0
You need to be logged in to leave comments. Login now