##// END OF EJS Templates
move utils.io.atomic_writing to contents.fileio
Min RK -
Show More
@@ -0,0 +1,131 b''
1 # encoding: utf-8
2 """Tests for file IO"""
3
4 # Copyright (c) IPython Development Team.
5 # Distributed under the terms of the Modified BSD License.
6
7 import io as stdlib_io
8 import os.path
9 import stat
10
11 import nose.tools as nt
12
13 from IPython.testing.decorators import skip_win32
14 from ..fileio import atomic_writing
15
16 from IPython.utils.tempdir import TemporaryDirectory
17
18 umask = 0
19
20 def test_atomic_writing():
21 class CustomExc(Exception): pass
22
23 with TemporaryDirectory() as td:
24 f1 = os.path.join(td, 'penguin')
25 with stdlib_io.open(f1, 'w') as f:
26 f.write(u'Before')
27
28 if os.name != 'nt':
29 os.chmod(f1, 0o701)
30 orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
31
32 f2 = os.path.join(td, 'flamingo')
33 try:
34 os.symlink(f1, f2)
35 have_symlink = True
36 except (AttributeError, NotImplementedError, OSError):
37 # AttributeError: Python doesn't support it
38 # NotImplementedError: The system doesn't support it
39 # OSError: The user lacks the privilege (Windows)
40 have_symlink = False
41
42 with nt.assert_raises(CustomExc):
43 with atomic_writing(f1) as f:
44 f.write(u'Failing write')
45 raise CustomExc
46
47 # Because of the exception, the file should not have been modified
48 with stdlib_io.open(f1, 'r') as f:
49 nt.assert_equal(f.read(), u'Before')
50
51 with atomic_writing(f1) as f:
52 f.write(u'Overwritten')
53
54 with stdlib_io.open(f1, 'r') as f:
55 nt.assert_equal(f.read(), u'Overwritten')
56
57 if os.name != 'nt':
58 mode = stat.S_IMODE(os.stat(f1).st_mode)
59 nt.assert_equal(mode, orig_mode)
60
61 if have_symlink:
62 # Check that writing over a file preserves a symlink
63 with atomic_writing(f2) as f:
64 f.write(u'written from symlink')
65
66 with stdlib_io.open(f1, 'r') as f:
67 nt.assert_equal(f.read(), u'written from symlink')
68
69 def _save_umask():
70 global umask
71 umask = os.umask(0)
72 os.umask(umask)
73
74 def _restore_umask():
75 os.umask(umask)
76
77 @skip_win32
78 @nt.with_setup(_save_umask, _restore_umask)
79 def test_atomic_writing_umask():
80 with TemporaryDirectory() as td:
81 os.umask(0o022)
82 f1 = os.path.join(td, '1')
83 with atomic_writing(f1) as f:
84 f.write(u'1')
85 mode = stat.S_IMODE(os.stat(f1).st_mode)
86 nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
87
88 os.umask(0o057)
89 f2 = os.path.join(td, '2')
90 with atomic_writing(f2) as f:
91 f.write(u'2')
92 mode = stat.S_IMODE(os.stat(f2).st_mode)
93 nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
94
95
96 def test_atomic_writing_newlines():
97 with TemporaryDirectory() as td:
98 path = os.path.join(td, 'testfile')
99
100 lf = u'a\nb\nc\n'
101 plat = lf.replace(u'\n', os.linesep)
102 crlf = lf.replace(u'\n', u'\r\n')
103
104 # test default
105 with stdlib_io.open(path, 'w') as f:
106 f.write(lf)
107 with stdlib_io.open(path, 'r', newline='') as f:
108 read = f.read()
109 nt.assert_equal(read, plat)
110
111 # test newline=LF
112 with stdlib_io.open(path, 'w', newline='\n') as f:
113 f.write(lf)
114 with stdlib_io.open(path, 'r', newline='') as f:
115 read = f.read()
116 nt.assert_equal(read, lf)
117
118 # test newline=CRLF
119 with atomic_writing(path, newline='\r\n') as f:
120 f.write(lf)
121 with stdlib_io.open(path, 'r', newline='') as f:
122 read = f.read()
123 nt.assert_equal(read, crlf)
124
125 # test newline=no convert
126 text = u'crlf\r\ncr\rlf\n'
127 with atomic_writing(path, newline='') as f:
128 f.write(text)
129 with stdlib_io.open(path, 'r', newline='') as f:
130 read = f.read()
131 nt.assert_equal(read, text)
@@ -1,174 +1,256 b''
1 1 """
2 2 Utilities for file-based Contents/Checkpoints managers.
3 3 """
4 4
5 5 # Copyright (c) IPython Development Team.
6 6 # Distributed under the terms of the Modified BSD License.
7 7
8 8 import base64
9 9 from contextlib import contextmanager
10 10 import errno
11 11 import io
12 12 import os
13 13 import shutil
14 import tempfile
14 15
15 16 from tornado.web import HTTPError
16 17
17 18 from IPython.html.utils import (
18 19 to_api_path,
19 20 to_os_path,
20 21 )
21 22 from IPython import nbformat
22 from IPython.utils.io import atomic_writing
23 23 from IPython.utils.py3compat import str_to_unicode
24 24
25 25
26 def _copy_metadata(src, dst):
27 """Copy the set of metadata we want for atomic_writing.
28
29 Permission bits and flags. We'd like to copy file ownership as well, but we
30 can't do that.
31 """
32 shutil.copymode(src, dst)
33 st = os.stat(src)
34 if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
35 os.chflags(dst, st.st_flags)
36
37 @contextmanager
38 def atomic_writing(path, text=True, encoding='utf-8', **kwargs):
39 """Context manager to write to a file only if the entire write is successful.
40
41 This works by creating a temporary file in the same directory, and renaming
42 it over the old file if the context is exited without an error. If other
43 file names are hard linked to the target file, this relationship will not be
44 preserved.
45
46 On Windows, there is a small chink in the atomicity: the target file is
47 deleted before renaming the temporary file over it. This appears to be
48 unavoidable.
49
50 Parameters
51 ----------
52 path : str
53 The target file to write to.
54
55 text : bool, optional
56 Whether to open the file in text mode (i.e. to write unicode). Default is
57 True.
58
59 encoding : str, optional
60 The encoding to use for files opened in text mode. Default is UTF-8.
61
62 **kwargs
63 Passed to :func:`io.open`.
64 """
65 # realpath doesn't work on Windows: http://bugs.python.org/issue9949
66 # Luckily, we only need to resolve the file itself being a symlink, not
67 # any of its directories, so this will suffice:
68 if os.path.islink(path):
69 path = os.path.join(os.path.dirname(path), os.readlink(path))
70
71 dirname, basename = os.path.split(path)
72 tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
73 tmp_path = os.path.join(tmp_dir, basename)
74 if text:
75 fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
76 else:
77 fileobj = io.open(tmp_path, 'wb', **kwargs)
78
79 try:
80 yield fileobj
81 except:
82 fileobj.close()
83 shutil.rmtree(tmp_dir)
84 raise
85
86 # Flush to disk
87 fileobj.flush()
88 os.fsync(fileobj.fileno())
89
90 # Written successfully, now rename it
91 fileobj.close()
92
93 # Copy permission bits, access time, etc.
94 try:
95 _copy_metadata(path, tmp_path)
96 except OSError:
97 # e.g. the file didn't already exist. Ignore any failure to copy metadata
98 pass
99
100 if os.name == 'nt' and os.path.exists(path):
101 # Rename over existing file doesn't work on Windows
102 os.remove(path)
103
104 os.rename(tmp_path, path)
105 shutil.rmtree(tmp_dir)
106
107
26 108 class FileManagerMixin(object):
27 109 """
28 110 Mixin for ContentsAPI classes that interact with the filesystem.
29 111
30 112 Provides facilities for reading, writing, and copying both notebooks and
31 113 generic files.
32 114
33 115 Shared by FileContentsManager and FileCheckpoints.
34 116
35 117 Note
36 118 ----
37 119 Classes using this mixin must provide the following attributes:
38 120
39 121 root_dir : unicode
40 122 A directory against against which API-style paths are to be resolved.
41 123
42 124 log : logging.Logger
43 125 """
44 126
45 127 @contextmanager
46 128 def open(self, os_path, *args, **kwargs):
47 129 """wrapper around io.open that turns permission errors into 403"""
48 130 with self.perm_to_403(os_path):
49 131 with io.open(os_path, *args, **kwargs) as f:
50 132 yield f
51 133
52 134 @contextmanager
53 135 def atomic_writing(self, os_path, *args, **kwargs):
54 136 """wrapper around atomic_writing that turns permission errors to 403"""
55 137 with self.perm_to_403(os_path):
56 138 with atomic_writing(os_path, *args, **kwargs) as f:
57 139 yield f
58 140
59 141 @contextmanager
60 142 def perm_to_403(self, os_path=''):
61 143 """context manager for turning permission errors into 403."""
62 144 try:
63 145 yield
64 146 except (OSError, IOError) as e:
65 147 if e.errno in {errno.EPERM, errno.EACCES}:
66 148 # make 403 error message without root prefix
67 149 # this may not work perfectly on unicode paths on Python 2,
68 150 # but nobody should be doing that anyway.
69 151 if not os_path:
70 152 os_path = str_to_unicode(e.filename or 'unknown file')
71 153 path = to_api_path(os_path, root=self.root_dir)
72 154 raise HTTPError(403, u'Permission denied: %s' % path)
73 155 else:
74 156 raise
75 157
76 158 def _copy(self, src, dest):
77 159 """copy src to dest
78 160
79 161 like shutil.copy2, but log errors in copystat
80 162 """
81 163 shutil.copyfile(src, dest)
82 164 try:
83 165 shutil.copystat(src, dest)
84 166 except OSError:
85 167 self.log.debug("copystat on %s failed", dest, exc_info=True)
86 168
87 169 def _get_os_path(self, path):
88 170 """Given an API path, return its file system path.
89 171
90 172 Parameters
91 173 ----------
92 174 path : string
93 175 The relative API path to the named file.
94 176
95 177 Returns
96 178 -------
97 179 path : string
98 180 Native, absolute OS path to for a file.
99 181
100 182 Raises
101 183 ------
102 184 404: if path is outside root
103 185 """
104 186 root = os.path.abspath(self.root_dir)
105 187 os_path = to_os_path(path, root)
106 188 if not (os.path.abspath(os_path) + os.path.sep).startswith(root):
107 189 raise HTTPError(404, "%s is outside root contents directory" % path)
108 190 return os_path
109 191
110 192 def _read_notebook(self, os_path, as_version=4):
111 193 """Read a notebook from an os path."""
112 194 with self.open(os_path, 'r', encoding='utf-8') as f:
113 195 try:
114 196 return nbformat.read(f, as_version=as_version)
115 197 except Exception as e:
116 198 raise HTTPError(
117 199 400,
118 200 u"Unreadable Notebook: %s %r" % (os_path, e),
119 201 )
120 202
121 203 def _save_notebook(self, os_path, nb):
122 204 """Save a notebook to an os_path."""
123 205 with self.atomic_writing(os_path, encoding='utf-8') as f:
124 206 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
125 207
126 208 def _read_file(self, os_path, format):
127 209 """Read a non-notebook file.
128 210
129 211 os_path: The path to be read.
130 212 format:
131 213 If 'text', the contents will be decoded as UTF-8.
132 214 If 'base64', the raw bytes contents will be encoded as base64.
133 215 If not specified, try to decode as UTF-8, and fall back to base64
134 216 """
135 217 if not os.path.isfile(os_path):
136 218 raise HTTPError(400, "Cannot read non-file %s" % os_path)
137 219
138 220 with self.open(os_path, 'rb') as f:
139 221 bcontent = f.read()
140 222
141 223 if format is None or format == 'text':
142 224 # Try to interpret as unicode if format is unknown or if unicode
143 225 # was explicitly requested.
144 226 try:
145 227 return bcontent.decode('utf8'), 'text'
146 228 except UnicodeError:
147 229 if format == 'text':
148 230 raise HTTPError(
149 231 400,
150 232 "%s is not UTF-8 encoded" % os_path,
151 233 reason='bad format',
152 234 )
153 235 return base64.encodestring(bcontent).decode('ascii'), 'base64'
154 236
155 237 def _save_file(self, os_path, content, format):
156 238 """Save content of a generic file."""
157 239 if format not in {'text', 'base64'}:
158 240 raise HTTPError(
159 241 400,
160 242 "Must specify format of file contents as 'text' or 'base64'",
161 243 )
162 244 try:
163 245 if format == 'text':
164 246 bcontent = content.encode('utf8')
165 247 else:
166 248 b64_bytes = content.encode('ascii')
167 249 bcontent = base64.decodestring(b64_bytes)
168 250 except Exception as e:
169 251 raise HTTPError(
170 252 400, u'Encoding error saving %s: %s' % (os_path, e)
171 253 )
172 254
173 255 with self.atomic_writing(os_path, text=False) as f:
174 256 f.write(bcontent)
@@ -1,322 +1,246 b''
1 1 # encoding: utf-8
2 2 """
3 3 IO related utilities.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 from __future__ import print_function
10 10 from __future__ import absolute_import
11 11
12 12
13 13 import codecs
14 14 from contextlib import contextmanager
15 15 import io
16 16 import os
17 17 import shutil
18 18 import sys
19 19 import tempfile
20 20 import warnings
21 21 from .capture import CapturedIO, capture_output
22 22 from .py3compat import string_types, input, PY3
23 23
24 24
25 25 class IOStream:
26 26
27 27 def __init__(self,stream, fallback=None):
28 28 if not hasattr(stream,'write') or not hasattr(stream,'flush'):
29 29 if fallback is not None:
30 30 stream = fallback
31 31 else:
32 32 raise ValueError("fallback required, but not specified")
33 33 self.stream = stream
34 34 self._swrite = stream.write
35 35
36 36 # clone all methods not overridden:
37 37 def clone(meth):
38 38 return not hasattr(self, meth) and not meth.startswith('_')
39 39 for meth in filter(clone, dir(stream)):
40 40 setattr(self, meth, getattr(stream, meth))
41 41
42 42 def __repr__(self):
43 43 cls = self.__class__
44 44 tpl = '{mod}.{cls}({args})'
45 45 return tpl.format(mod=cls.__module__, cls=cls.__name__, args=self.stream)
46 46
47 47 def write(self,data):
48 48 try:
49 49 self._swrite(data)
50 50 except:
51 51 try:
52 52 # print handles some unicode issues which may trip a plain
53 53 # write() call. Emulate write() by using an empty end
54 54 # argument.
55 55 print(data, end='', file=self.stream)
56 56 except:
57 57 # if we get here, something is seriously broken.
58 58 print('ERROR - failed to write data to stream:', self.stream,
59 59 file=sys.stderr)
60 60
61 61 def writelines(self, lines):
62 62 if isinstance(lines, string_types):
63 63 lines = [lines]
64 64 for line in lines:
65 65 self.write(line)
66 66
67 67 # This class used to have a writeln method, but regular files and streams
68 68 # in Python don't have this method. We need to keep this completely
69 69 # compatible so we removed it.
70 70
71 71 @property
72 72 def closed(self):
73 73 return self.stream.closed
74 74
75 75 def close(self):
76 76 pass
77 77
78 78 # setup stdin/stdout/stderr to sys.stdin/sys.stdout/sys.stderr
79 79 devnull = open(os.devnull, 'w')
80 80 stdin = IOStream(sys.stdin, fallback=devnull)
81 81 stdout = IOStream(sys.stdout, fallback=devnull)
82 82 stderr = IOStream(sys.stderr, fallback=devnull)
83 83
84 84 class IOTerm:
85 85 """ Term holds the file or file-like objects for handling I/O operations.
86 86
87 87 These are normally just sys.stdin, sys.stdout and sys.stderr but for
88 88 Windows they can can replaced to allow editing the strings before they are
89 89 displayed."""
90 90
91 91 # In the future, having IPython channel all its I/O operations through
92 92 # this class will make it easier to embed it into other environments which
93 93 # are not a normal terminal (such as a GUI-based shell)
94 94 def __init__(self, stdin=None, stdout=None, stderr=None):
95 95 mymodule = sys.modules[__name__]
96 96 self.stdin = IOStream(stdin, mymodule.stdin)
97 97 self.stdout = IOStream(stdout, mymodule.stdout)
98 98 self.stderr = IOStream(stderr, mymodule.stderr)
99 99
100 100
101 101 class Tee(object):
102 102 """A class to duplicate an output stream to stdout/err.
103 103
104 104 This works in a manner very similar to the Unix 'tee' command.
105 105
106 106 When the object is closed or deleted, it closes the original file given to
107 107 it for duplication.
108 108 """
109 109 # Inspired by:
110 110 # http://mail.python.org/pipermail/python-list/2007-May/442737.html
111 111
112 112 def __init__(self, file_or_name, mode="w", channel='stdout'):
113 113 """Construct a new Tee object.
114 114
115 115 Parameters
116 116 ----------
117 117 file_or_name : filename or open filehandle (writable)
118 118 File that will be duplicated
119 119
120 120 mode : optional, valid mode for open().
121 121 If a filename was give, open with this mode.
122 122
123 123 channel : str, one of ['stdout', 'stderr']
124 124 """
125 125 if channel not in ['stdout', 'stderr']:
126 126 raise ValueError('Invalid channel spec %s' % channel)
127 127
128 128 if hasattr(file_or_name, 'write') and hasattr(file_or_name, 'seek'):
129 129 self.file = file_or_name
130 130 else:
131 131 self.file = open(file_or_name, mode)
132 132 self.channel = channel
133 133 self.ostream = getattr(sys, channel)
134 134 setattr(sys, channel, self)
135 135 self._closed = False
136 136
137 137 def close(self):
138 138 """Close the file and restore the channel."""
139 139 self.flush()
140 140 setattr(sys, self.channel, self.ostream)
141 141 self.file.close()
142 142 self._closed = True
143 143
144 144 def write(self, data):
145 145 """Write data to both channels."""
146 146 self.file.write(data)
147 147 self.ostream.write(data)
148 148 self.ostream.flush()
149 149
150 150 def flush(self):
151 151 """Flush both channels."""
152 152 self.file.flush()
153 153 self.ostream.flush()
154 154
155 155 def __del__(self):
156 156 if not self._closed:
157 157 self.close()
158 158
159 159
160 160 def ask_yes_no(prompt, default=None, interrupt=None):
161 161 """Asks a question and returns a boolean (y/n) answer.
162 162
163 163 If default is given (one of 'y','n'), it is used if the user input is
164 164 empty. If interrupt is given (one of 'y','n'), it is used if the user
165 165 presses Ctrl-C. Otherwise the question is repeated until an answer is
166 166 given.
167 167
168 168 An EOF is treated as the default answer. If there is no default, an
169 169 exception is raised to prevent infinite loops.
170 170
171 171 Valid answers are: y/yes/n/no (match is not case sensitive)."""
172 172
173 173 answers = {'y':True,'n':False,'yes':True,'no':False}
174 174 ans = None
175 175 while ans not in answers.keys():
176 176 try:
177 177 ans = input(prompt+' ').lower()
178 178 if not ans: # response was an empty string
179 179 ans = default
180 180 except KeyboardInterrupt:
181 181 if interrupt:
182 182 ans = interrupt
183 183 except EOFError:
184 184 if default in answers.keys():
185 185 ans = default
186 186 print()
187 187 else:
188 188 raise
189 189
190 190 return answers[ans]
191 191
192 192
193 193 def temp_pyfile(src, ext='.py'):
194 194 """Make a temporary python file, return filename and filehandle.
195 195
196 196 Parameters
197 197 ----------
198 198 src : string or list of strings (no need for ending newlines if list)
199 199 Source code to be written to the file.
200 200
201 201 ext : optional, string
202 202 Extension for the generated file.
203 203
204 204 Returns
205 205 -------
206 206 (filename, open filehandle)
207 207 It is the caller's responsibility to close the open file and unlink it.
208 208 """
209 209 fname = tempfile.mkstemp(ext)[1]
210 210 f = open(fname,'w')
211 211 f.write(src)
212 212 f.flush()
213 213 return fname, f
214 214
215 def _copy_metadata(src, dst):
216 """Copy the set of metadata we want for atomic_writing.
217
218 Permission bits and flags. We'd like to copy file ownership as well, but we
219 can't do that.
220 """
221 shutil.copymode(src, dst)
222 st = os.stat(src)
223 if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
224 os.chflags(dst, st.st_flags)
225
226 @contextmanager
227 def atomic_writing(path, text=True, encoding='utf-8', **kwargs):
228 """Context manager to write to a file only if the entire write is successful.
229
230 This works by creating a temporary file in the same directory, and renaming
231 it over the old file if the context is exited without an error. If other
232 file names are hard linked to the target file, this relationship will not be
233 preserved.
234
235 On Windows, there is a small chink in the atomicity: the target file is
236 deleted before renaming the temporary file over it. This appears to be
237 unavoidable.
238
239 Parameters
240 ----------
241 path : str
242 The target file to write to.
243
244 text : bool, optional
245 Whether to open the file in text mode (i.e. to write unicode). Default is
246 True.
247
248 encoding : str, optional
249 The encoding to use for files opened in text mode. Default is UTF-8.
250
251 **kwargs
252 Passed to :func:`io.open`.
253 """
254 # realpath doesn't work on Windows: http://bugs.python.org/issue9949
255 # Luckily, we only need to resolve the file itself being a symlink, not
256 # any of its directories, so this will suffice:
257 if os.path.islink(path):
258 path = os.path.join(os.path.dirname(path), os.readlink(path))
259
260 dirname, basename = os.path.split(path)
261 tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
262 tmp_path = os.path.join(tmp_dir, basename)
263 if text:
264 fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
265 else:
266 fileobj = io.open(tmp_path, 'wb', **kwargs)
267
268 try:
269 yield fileobj
270 except:
271 fileobj.close()
272 shutil.rmtree(tmp_dir)
273 raise
274
275 # Flush to disk
276 fileobj.flush()
277 os.fsync(fileobj.fileno())
278
279 # Written successfully, now rename it
280 fileobj.close()
281
282 # Copy permission bits, access time, etc.
283 try:
284 _copy_metadata(path, tmp_path)
285 except OSError:
286 # e.g. the file didn't already exist. Ignore any failure to copy metadata
287 pass
288
289 if os.name == 'nt' and os.path.exists(path):
290 # Rename over existing file doesn't work on Windows
291 os.remove(path)
292
293 os.rename(tmp_path, path)
294 shutil.rmtree(tmp_dir)
295
215 def atomic_writing(*args, **kwargs):
216 """DEPRECATED: moved to IPython.html.services.contents.fileio"""
217 warn("IPython.utils.io.atomic_writing has moved to IPython.html.services.contents.fileio")
218 from IPython.html.services.contents.fileio import atomic_writing
219 return atomic_writing(*args, **kwargs)
296 220
297 221 def raw_print(*args, **kw):
298 222 """Raw print to sys.__stdout__, otherwise identical interface to print()."""
299 223
300 224 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
301 225 file=sys.__stdout__)
302 226 sys.__stdout__.flush()
303 227
304 228
305 229 def raw_print_err(*args, **kw):
306 230 """Raw print to sys.__stderr__, otherwise identical interface to print()."""
307 231
308 232 print(*args, sep=kw.get('sep', ' '), end=kw.get('end', '\n'),
309 233 file=sys.__stderr__)
310 234 sys.__stderr__.flush()
311 235
312 236
313 237 # Short aliases for quick debugging, do NOT use these in production code.
314 238 rprint = raw_print
315 239 rprinte = raw_print_err
316 240
317 241
318 242 def unicode_std_stream(stream='stdout'):
319 243 """DEPRECATED, moved to jupyter_nbconvert.utils.io"""
320 244 warn("IPython.utils.io.unicode_std_stream has moved to jupyter_nbconvert.utils.io")
321 245 from jupyter_nbconvert.utils.io import unicode_std_stream
322 246 return unicode_std_stream(stream)
@@ -1,201 +1,87 b''
1 1 # encoding: utf-8
2 2 """Tests for io.py"""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7 from __future__ import print_function
8 8 from __future__ import absolute_import
9 9
10 10 import io as stdlib_io
11 11 import os.path
12 12 import stat
13 13 import sys
14 14
15 15 from subprocess import Popen, PIPE
16 16 import unittest
17 17
18 18 import nose.tools as nt
19 19
20 20 from IPython.testing.decorators import skipif, skip_win32
21 from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
22 atomic_writing,
23 )
21 from IPython.utils.io import Tee, capture_output
24 22 from IPython.utils.py3compat import doctest_refactor_print, PY3
25 23 from IPython.utils.tempdir import TemporaryDirectory
26 24
27 25 if PY3:
28 26 from io import StringIO
29 27 else:
30 28 from StringIO import StringIO
31 29
32 30
33 31 def test_tee_simple():
34 32 "Very simple check with stdout only"
35 33 chan = StringIO()
36 34 text = 'Hello'
37 35 tee = Tee(chan, channel='stdout')
38 36 print(text, file=chan)
39 37 nt.assert_equal(chan.getvalue(), text+"\n")
40 38
41 39
42 40 class TeeTestCase(unittest.TestCase):
43 41
44 42 def tchan(self, channel, check='close'):
45 43 trap = StringIO()
46 44 chan = StringIO()
47 45 text = 'Hello'
48 46
49 47 std_ori = getattr(sys, channel)
50 48 setattr(sys, channel, trap)
51 49
52 50 tee = Tee(chan, channel=channel)
53 51 print(text, end='', file=chan)
54 52 setattr(sys, channel, std_ori)
55 53 trap_val = trap.getvalue()
56 54 nt.assert_equal(chan.getvalue(), text)
57 55 if check=='close':
58 56 tee.close()
59 57 else:
60 58 del tee
61 59
62 60 def test(self):
63 61 for chan in ['stdout', 'stderr']:
64 62 for check in ['close', 'del']:
65 63 self.tchan(chan, check)
66 64
67 65 def test_io_init():
68 66 """Test that io.stdin/out/err exist at startup"""
69 67 for name in ('stdin', 'stdout', 'stderr'):
70 68 cmd = doctest_refactor_print("from IPython.utils import io;print io.%s.__class__"%name)
71 69 p = Popen([sys.executable, '-c', cmd],
72 70 stdout=PIPE)
73 71 p.wait()
74 72 classname = p.stdout.read().strip().decode('ascii')
75 73 # __class__ is a reference to the class object in Python 3, so we can't
76 74 # just test for string equality.
77 75 assert 'IPython.utils.io.IOStream' in classname, classname
78 76
79 77 def test_capture_output():
80 78 """capture_output() context works"""
81 79
82 80 with capture_output() as io:
83 81 print('hi, stdout')
84 82 print('hi, stderr', file=sys.stderr)
85 83
86 84 nt.assert_equal(io.stdout, 'hi, stdout\n')
87 85 nt.assert_equal(io.stderr, 'hi, stderr\n')
88 86
89 87
90 def test_atomic_writing():
91 class CustomExc(Exception): pass
92
93 with TemporaryDirectory() as td:
94 f1 = os.path.join(td, 'penguin')
95 with stdlib_io.open(f1, 'w') as f:
96 f.write(u'Before')
97
98 if os.name != 'nt':
99 os.chmod(f1, 0o701)
100 orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
101
102 f2 = os.path.join(td, 'flamingo')
103 try:
104 os.symlink(f1, f2)
105 have_symlink = True
106 except (AttributeError, NotImplementedError, OSError):
107 # AttributeError: Python doesn't support it
108 # NotImplementedError: The system doesn't support it
109 # OSError: The user lacks the privilege (Windows)
110 have_symlink = False
111
112 with nt.assert_raises(CustomExc):
113 with atomic_writing(f1) as f:
114 f.write(u'Failing write')
115 raise CustomExc
116
117 # Because of the exception, the file should not have been modified
118 with stdlib_io.open(f1, 'r') as f:
119 nt.assert_equal(f.read(), u'Before')
120
121 with atomic_writing(f1) as f:
122 f.write(u'Overwritten')
123
124 with stdlib_io.open(f1, 'r') as f:
125 nt.assert_equal(f.read(), u'Overwritten')
126
127 if os.name != 'nt':
128 mode = stat.S_IMODE(os.stat(f1).st_mode)
129 nt.assert_equal(mode, orig_mode)
130
131 if have_symlink:
132 # Check that writing over a file preserves a symlink
133 with atomic_writing(f2) as f:
134 f.write(u'written from symlink')
135
136 with stdlib_io.open(f1, 'r') as f:
137 nt.assert_equal(f.read(), u'written from symlink')
138
139 def _save_umask():
140 global umask
141 umask = os.umask(0)
142 os.umask(umask)
143
144 def _restore_umask():
145 os.umask(umask)
146
147 @skip_win32
148 @nt.with_setup(_save_umask, _restore_umask)
149 def test_atomic_writing_umask():
150 with TemporaryDirectory() as td:
151 os.umask(0o022)
152 f1 = os.path.join(td, '1')
153 with atomic_writing(f1) as f:
154 f.write(u'1')
155 mode = stat.S_IMODE(os.stat(f1).st_mode)
156 nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
157
158 os.umask(0o057)
159 f2 = os.path.join(td, '2')
160 with atomic_writing(f2) as f:
161 f.write(u'2')
162 mode = stat.S_IMODE(os.stat(f2).st_mode)
163 nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
164
165
166 def test_atomic_writing_newlines():
167 with TemporaryDirectory() as td:
168 path = os.path.join(td, 'testfile')
169
170 lf = u'a\nb\nc\n'
171 plat = lf.replace(u'\n', os.linesep)
172 crlf = lf.replace(u'\n', u'\r\n')
173
174 # test default
175 with stdlib_io.open(path, 'w') as f:
176 f.write(lf)
177 with stdlib_io.open(path, 'r', newline='') as f:
178 read = f.read()
179 nt.assert_equal(read, plat)
180
181 # test newline=LF
182 with stdlib_io.open(path, 'w', newline='\n') as f:
183 f.write(lf)
184 with stdlib_io.open(path, 'r', newline='') as f:
185 read = f.read()
186 nt.assert_equal(read, lf)
187
188 # test newline=CRLF
189 with atomic_writing(path, newline='\r\n') as f:
190 f.write(lf)
191 with stdlib_io.open(path, 'r', newline='') as f:
192 read = f.read()
193 nt.assert_equal(read, crlf)
194
195 # test newline=no convert
196 text = u'crlf\r\ncr\rlf\n'
197 with atomic_writing(path, newline='') as f:
198 f.write(text)
199 with stdlib_io.open(path, 'r', newline='') as f:
200 read = f.read()
201 nt.assert_equal(read, text)
General Comments 0
You need to be logged in to leave comments. Login now