##// END OF EJS Templates
contents: double check that os_path is always within root...
Min RK -
Show More
@@ -1,166 +1,174 b''
1 """
1 """
2 Utilities for file-based Contents/Checkpoints managers.
2 Utilities for file-based Contents/Checkpoints managers.
3 """
3 """
4
4
5 # Copyright (c) IPython Development Team.
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
6 # Distributed under the terms of the Modified BSD License.
7
7
8 import base64
8 import base64
9 from contextlib import contextmanager
9 from contextlib import contextmanager
10 import errno
10 import errno
11 import io
11 import io
12 import os
12 import os
13 import shutil
13 import shutil
14
14
15 from tornado.web import HTTPError
15 from tornado.web import HTTPError
16
16
17 from IPython.html.utils import (
17 from IPython.html.utils import (
18 to_api_path,
18 to_api_path,
19 to_os_path,
19 to_os_path,
20 )
20 )
21 from IPython import nbformat
21 from IPython import nbformat
22 from IPython.utils.io import atomic_writing
22 from IPython.utils.io import atomic_writing
23 from IPython.utils.py3compat import str_to_unicode
23 from IPython.utils.py3compat import str_to_unicode
24
24
25
25
26 class FileManagerMixin(object):
26 class FileManagerMixin(object):
27 """
27 """
28 Mixin for ContentsAPI classes that interact with the filesystem.
28 Mixin for ContentsAPI classes that interact with the filesystem.
29
29
30 Provides facilities for reading, writing, and copying both notebooks and
30 Provides facilities for reading, writing, and copying both notebooks and
31 generic files.
31 generic files.
32
32
33 Shared by FileContentsManager and FileCheckpoints.
33 Shared by FileContentsManager and FileCheckpoints.
34
34
35 Note
35 Note
36 ----
36 ----
37 Classes using this mixin must provide the following attributes:
37 Classes using this mixin must provide the following attributes:
38
38
39 root_dir : unicode
39 root_dir : unicode
40 A directory against against which API-style paths are to be resolved.
40 A directory against against which API-style paths are to be resolved.
41
41
42 log : logging.Logger
42 log : logging.Logger
43 """
43 """
44
44
45 @contextmanager
45 @contextmanager
46 def open(self, os_path, *args, **kwargs):
46 def open(self, os_path, *args, **kwargs):
47 """wrapper around io.open that turns permission errors into 403"""
47 """wrapper around io.open that turns permission errors into 403"""
48 with self.perm_to_403(os_path):
48 with self.perm_to_403(os_path):
49 with io.open(os_path, *args, **kwargs) as f:
49 with io.open(os_path, *args, **kwargs) as f:
50 yield f
50 yield f
51
51
52 @contextmanager
52 @contextmanager
53 def atomic_writing(self, os_path, *args, **kwargs):
53 def atomic_writing(self, os_path, *args, **kwargs):
54 """wrapper around atomic_writing that turns permission errors to 403"""
54 """wrapper around atomic_writing that turns permission errors to 403"""
55 with self.perm_to_403(os_path):
55 with self.perm_to_403(os_path):
56 with atomic_writing(os_path, *args, **kwargs) as f:
56 with atomic_writing(os_path, *args, **kwargs) as f:
57 yield f
57 yield f
58
58
59 @contextmanager
59 @contextmanager
60 def perm_to_403(self, os_path=''):
60 def perm_to_403(self, os_path=''):
61 """context manager for turning permission errors into 403."""
61 """context manager for turning permission errors into 403."""
62 try:
62 try:
63 yield
63 yield
64 except (OSError, IOError) as e:
64 except (OSError, IOError) as e:
65 if e.errno in {errno.EPERM, errno.EACCES}:
65 if e.errno in {errno.EPERM, errno.EACCES}:
66 # make 403 error message without root prefix
66 # make 403 error message without root prefix
67 # this may not work perfectly on unicode paths on Python 2,
67 # this may not work perfectly on unicode paths on Python 2,
68 # but nobody should be doing that anyway.
68 # but nobody should be doing that anyway.
69 if not os_path:
69 if not os_path:
70 os_path = str_to_unicode(e.filename or 'unknown file')
70 os_path = str_to_unicode(e.filename or 'unknown file')
71 path = to_api_path(os_path, root=self.root_dir)
71 path = to_api_path(os_path, root=self.root_dir)
72 raise HTTPError(403, u'Permission denied: %s' % path)
72 raise HTTPError(403, u'Permission denied: %s' % path)
73 else:
73 else:
74 raise
74 raise
75
75
76 def _copy(self, src, dest):
76 def _copy(self, src, dest):
77 """copy src to dest
77 """copy src to dest
78
78
79 like shutil.copy2, but log errors in copystat
79 like shutil.copy2, but log errors in copystat
80 """
80 """
81 shutil.copyfile(src, dest)
81 shutil.copyfile(src, dest)
82 try:
82 try:
83 shutil.copystat(src, dest)
83 shutil.copystat(src, dest)
84 except OSError:
84 except OSError:
85 self.log.debug("copystat on %s failed", dest, exc_info=True)
85 self.log.debug("copystat on %s failed", dest, exc_info=True)
86
86
87 def _get_os_path(self, path):
87 def _get_os_path(self, path):
88 """Given an API path, return its file system path.
88 """Given an API path, return its file system path.
89
89
90 Parameters
90 Parameters
91 ----------
91 ----------
92 path : string
92 path : string
93 The relative API path to the named file.
93 The relative API path to the named file.
94
94
95 Returns
95 Returns
96 -------
96 -------
97 path : string
97 path : string
98 Native, absolute OS path to for a file.
98 Native, absolute OS path to for a file.
99
100 Raises
101 ------
102 404: if path is outside root
99 """
103 """
100 return to_os_path(path, self.root_dir)
104 root = os.path.abspath(self.root_dir)
105 os_path = to_os_path(path, root)
106 if not (os.path.abspath(os_path) + os.path.sep).startswith(root):
107 raise HTTPError(404, "%s is outside root contents directory" % path)
108 return os_path
101
109
102 def _read_notebook(self, os_path, as_version=4):
110 def _read_notebook(self, os_path, as_version=4):
103 """Read a notebook from an os path."""
111 """Read a notebook from an os path."""
104 with self.open(os_path, 'r', encoding='utf-8') as f:
112 with self.open(os_path, 'r', encoding='utf-8') as f:
105 try:
113 try:
106 return nbformat.read(f, as_version=as_version)
114 return nbformat.read(f, as_version=as_version)
107 except Exception as e:
115 except Exception as e:
108 raise HTTPError(
116 raise HTTPError(
109 400,
117 400,
110 u"Unreadable Notebook: %s %r" % (os_path, e),
118 u"Unreadable Notebook: %s %r" % (os_path, e),
111 )
119 )
112
120
113 def _save_notebook(self, os_path, nb):
121 def _save_notebook(self, os_path, nb):
114 """Save a notebook to an os_path."""
122 """Save a notebook to an os_path."""
115 with self.atomic_writing(os_path, encoding='utf-8') as f:
123 with self.atomic_writing(os_path, encoding='utf-8') as f:
116 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
124 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
117
125
118 def _read_file(self, os_path, format):
126 def _read_file(self, os_path, format):
119 """Read a non-notebook file.
127 """Read a non-notebook file.
120
128
121 os_path: The path to be read.
129 os_path: The path to be read.
122 format:
130 format:
123 If 'text', the contents will be decoded as UTF-8.
131 If 'text', the contents will be decoded as UTF-8.
124 If 'base64', the raw bytes contents will be encoded as base64.
132 If 'base64', the raw bytes contents will be encoded as base64.
125 If not specified, try to decode as UTF-8, and fall back to base64
133 If not specified, try to decode as UTF-8, and fall back to base64
126 """
134 """
127 if not os.path.isfile(os_path):
135 if not os.path.isfile(os_path):
128 raise HTTPError(400, "Cannot read non-file %s" % os_path)
136 raise HTTPError(400, "Cannot read non-file %s" % os_path)
129
137
130 with self.open(os_path, 'rb') as f:
138 with self.open(os_path, 'rb') as f:
131 bcontent = f.read()
139 bcontent = f.read()
132
140
133 if format is None or format == 'text':
141 if format is None or format == 'text':
134 # Try to interpret as unicode if format is unknown or if unicode
142 # Try to interpret as unicode if format is unknown or if unicode
135 # was explicitly requested.
143 # was explicitly requested.
136 try:
144 try:
137 return bcontent.decode('utf8'), 'text'
145 return bcontent.decode('utf8'), 'text'
138 except UnicodeError:
146 except UnicodeError:
139 if format == 'text':
147 if format == 'text':
140 raise HTTPError(
148 raise HTTPError(
141 400,
149 400,
142 "%s is not UTF-8 encoded" % os_path,
150 "%s is not UTF-8 encoded" % os_path,
143 reason='bad format',
151 reason='bad format',
144 )
152 )
145 return base64.encodestring(bcontent).decode('ascii'), 'base64'
153 return base64.encodestring(bcontent).decode('ascii'), 'base64'
146
154
147 def _save_file(self, os_path, content, format):
155 def _save_file(self, os_path, content, format):
148 """Save content of a generic file."""
156 """Save content of a generic file."""
149 if format not in {'text', 'base64'}:
157 if format not in {'text', 'base64'}:
150 raise HTTPError(
158 raise HTTPError(
151 400,
159 400,
152 "Must specify format of file contents as 'text' or 'base64'",
160 "Must specify format of file contents as 'text' or 'base64'",
153 )
161 )
154 try:
162 try:
155 if format == 'text':
163 if format == 'text':
156 bcontent = content.encode('utf8')
164 bcontent = content.encode('utf8')
157 else:
165 else:
158 b64_bytes = content.encode('ascii')
166 b64_bytes = content.encode('ascii')
159 bcontent = base64.decodestring(b64_bytes)
167 bcontent = base64.decodestring(b64_bytes)
160 except Exception as e:
168 except Exception as e:
161 raise HTTPError(
169 raise HTTPError(
162 400, u'Encoding error saving %s: %s' % (os_path, e)
170 400, u'Encoding error saving %s: %s' % (os_path, e)
163 )
171 )
164
172
165 with self.atomic_writing(os_path, text=False) as f:
173 with self.atomic_writing(os_path, text=False) as f:
166 f.write(bcontent)
174 f.write(bcontent)
@@ -1,472 +1,473 b''
1 """A contents manager that uses the local file system for storage."""
1 """A contents manager that uses the local file system for storage."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6
6
7 import io
7 import io
8 import os
8 import os
9 import shutil
9 import shutil
10 import mimetypes
10 import mimetypes
11
11
12 from tornado import web
12 from tornado import web
13
13
14 from .filecheckpoints import FileCheckpoints
14 from .filecheckpoints import FileCheckpoints
15 from .fileio import FileManagerMixin
15 from .fileio import FileManagerMixin
16 from .manager import ContentsManager
16 from .manager import ContentsManager
17
17
18 from IPython import nbformat
18 from IPython import nbformat
19 from IPython.utils.importstring import import_item
19 from IPython.utils.importstring import import_item
20 from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
20 from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
21 from IPython.utils.py3compat import getcwd, string_types
21 from IPython.utils.py3compat import getcwd, string_types
22 from IPython.utils import tz
22 from IPython.utils import tz
23 from IPython.html.utils import (
23 from IPython.html.utils import (
24 is_hidden,
24 is_hidden,
25 to_api_path,
25 to_api_path,
26 )
26 )
27
27
28 _script_exporter = None
28 _script_exporter = None
29
29
30
30
31 def _post_save_script(model, os_path, contents_manager, **kwargs):
31 def _post_save_script(model, os_path, contents_manager, **kwargs):
32 """convert notebooks to Python script after save with nbconvert
32 """convert notebooks to Python script after save with nbconvert
33
33
34 replaces `ipython notebook --script`
34 replaces `ipython notebook --script`
35 """
35 """
36 from IPython.nbconvert.exporters.script import ScriptExporter
36 from IPython.nbconvert.exporters.script import ScriptExporter
37
37
38 if model['type'] != 'notebook':
38 if model['type'] != 'notebook':
39 return
39 return
40
40
41 global _script_exporter
41 global _script_exporter
42 if _script_exporter is None:
42 if _script_exporter is None:
43 _script_exporter = ScriptExporter(parent=contents_manager)
43 _script_exporter = ScriptExporter(parent=contents_manager)
44 log = contents_manager.log
44 log = contents_manager.log
45
45
46 base, ext = os.path.splitext(os_path)
46 base, ext = os.path.splitext(os_path)
47 py_fname = base + '.py'
47 py_fname = base + '.py'
48 script, resources = _script_exporter.from_filename(os_path)
48 script, resources = _script_exporter.from_filename(os_path)
49 script_fname = base + resources.get('output_extension', '.txt')
49 script_fname = base + resources.get('output_extension', '.txt')
50 log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir))
50 log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir))
51 with io.open(script_fname, 'w', encoding='utf-8') as f:
51 with io.open(script_fname, 'w', encoding='utf-8') as f:
52 f.write(script)
52 f.write(script)
53
53
54
54
55 class FileContentsManager(FileManagerMixin, ContentsManager):
55 class FileContentsManager(FileManagerMixin, ContentsManager):
56
56
57 root_dir = Unicode(config=True)
57 root_dir = Unicode(config=True)
58
58
59 def _root_dir_default(self):
59 def _root_dir_default(self):
60 try:
60 try:
61 return self.parent.notebook_dir
61 return self.parent.notebook_dir
62 except AttributeError:
62 except AttributeError:
63 return getcwd()
63 return getcwd()
64
64
65 save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook')
65 save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook')
66 def _save_script_changed(self):
66 def _save_script_changed(self):
67 self.log.warn("""
67 self.log.warn("""
68 `--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks:
68 `--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks:
69
69
70 ContentsManager.pre_save_hook
70 ContentsManager.pre_save_hook
71 FileContentsManager.post_save_hook
71 FileContentsManager.post_save_hook
72
72
73 A post-save hook has been registered that calls:
73 A post-save hook has been registered that calls:
74
74
75 ipython nbconvert --to script [notebook]
75 ipython nbconvert --to script [notebook]
76
76
77 which behaves similarly to `--script`.
77 which behaves similarly to `--script`.
78 """)
78 """)
79
79
80 self.post_save_hook = _post_save_script
80 self.post_save_hook = _post_save_script
81
81
82 post_save_hook = Any(None, config=True,
82 post_save_hook = Any(None, config=True,
83 help="""Python callable or importstring thereof
83 help="""Python callable or importstring thereof
84
84
85 to be called on the path of a file just saved.
85 to be called on the path of a file just saved.
86
86
87 This can be used to process the file on disk,
87 This can be used to process the file on disk,
88 such as converting the notebook to a script or HTML via nbconvert.
88 such as converting the notebook to a script or HTML via nbconvert.
89
89
90 It will be called as (all arguments passed by keyword)::
90 It will be called as (all arguments passed by keyword)::
91
91
92 hook(os_path=os_path, model=model, contents_manager=instance)
92 hook(os_path=os_path, model=model, contents_manager=instance)
93
93
94 - path: the filesystem path to the file just written
94 - path: the filesystem path to the file just written
95 - model: the model representing the file
95 - model: the model representing the file
96 - contents_manager: this ContentsManager instance
96 - contents_manager: this ContentsManager instance
97 """
97 """
98 )
98 )
99 def _post_save_hook_changed(self, name, old, new):
99 def _post_save_hook_changed(self, name, old, new):
100 if new and isinstance(new, string_types):
100 if new and isinstance(new, string_types):
101 self.post_save_hook = import_item(self.post_save_hook)
101 self.post_save_hook = import_item(self.post_save_hook)
102 elif new:
102 elif new:
103 if not callable(new):
103 if not callable(new):
104 raise TraitError("post_save_hook must be callable")
104 raise TraitError("post_save_hook must be callable")
105
105
106 def run_post_save_hook(self, model, os_path):
106 def run_post_save_hook(self, model, os_path):
107 """Run the post-save hook if defined, and log errors"""
107 """Run the post-save hook if defined, and log errors"""
108 if self.post_save_hook:
108 if self.post_save_hook:
109 try:
109 try:
110 self.log.debug("Running post-save hook on %s", os_path)
110 self.log.debug("Running post-save hook on %s", os_path)
111 self.post_save_hook(os_path=os_path, model=model, contents_manager=self)
111 self.post_save_hook(os_path=os_path, model=model, contents_manager=self)
112 except Exception:
112 except Exception:
113 self.log.error("Post-save hook failed on %s", os_path, exc_info=True)
113 self.log.error("Post-save hook failed on %s", os_path, exc_info=True)
114
114
115 def _root_dir_changed(self, name, old, new):
115 def _root_dir_changed(self, name, old, new):
116 """Do a bit of validation of the root_dir."""
116 """Do a bit of validation of the root_dir."""
117 if not os.path.isabs(new):
117 if not os.path.isabs(new):
118 # If we receive a non-absolute path, make it absolute.
118 # If we receive a non-absolute path, make it absolute.
119 self.root_dir = os.path.abspath(new)
119 self.root_dir = os.path.abspath(new)
120 return
120 return
121 if not os.path.isdir(new):
121 if not os.path.isdir(new):
122 raise TraitError("%r is not a directory" % new)
122 raise TraitError("%r is not a directory" % new)
123
123
124 def _checkpoints_class_default(self):
124 def _checkpoints_class_default(self):
125 return FileCheckpoints
125 return FileCheckpoints
126
126
127 def is_hidden(self, path):
127 def is_hidden(self, path):
128 """Does the API style path correspond to a hidden directory or file?
128 """Does the API style path correspond to a hidden directory or file?
129
129
130 Parameters
130 Parameters
131 ----------
131 ----------
132 path : string
132 path : string
133 The path to check. This is an API path (`/` separated,
133 The path to check. This is an API path (`/` separated,
134 relative to root_dir).
134 relative to root_dir).
135
135
136 Returns
136 Returns
137 -------
137 -------
138 hidden : bool
138 hidden : bool
139 Whether the path exists and is hidden.
139 Whether the path exists and is hidden.
140 """
140 """
141 path = path.strip('/')
141 path = path.strip('/')
142 os_path = self._get_os_path(path=path)
142 os_path = self._get_os_path(path=path)
143 return is_hidden(os_path, self.root_dir)
143 return is_hidden(os_path, self.root_dir)
144
144
145 def file_exists(self, path):
145 def file_exists(self, path):
146 """Returns True if the file exists, else returns False.
146 """Returns True if the file exists, else returns False.
147
147
148 API-style wrapper for os.path.isfile
148 API-style wrapper for os.path.isfile
149
149
150 Parameters
150 Parameters
151 ----------
151 ----------
152 path : string
152 path : string
153 The relative path to the file (with '/' as separator)
153 The relative path to the file (with '/' as separator)
154
154
155 Returns
155 Returns
156 -------
156 -------
157 exists : bool
157 exists : bool
158 Whether the file exists.
158 Whether the file exists.
159 """
159 """
160 path = path.strip('/')
160 path = path.strip('/')
161 os_path = self._get_os_path(path)
161 os_path = self._get_os_path(path)
162 return os.path.isfile(os_path)
162 return os.path.isfile(os_path)
163
163
164 def dir_exists(self, path):
164 def dir_exists(self, path):
165 """Does the API-style path refer to an extant directory?
165 """Does the API-style path refer to an extant directory?
166
166
167 API-style wrapper for os.path.isdir
167 API-style wrapper for os.path.isdir
168
168
169 Parameters
169 Parameters
170 ----------
170 ----------
171 path : string
171 path : string
172 The path to check. This is an API path (`/` separated,
172 The path to check. This is an API path (`/` separated,
173 relative to root_dir).
173 relative to root_dir).
174
174
175 Returns
175 Returns
176 -------
176 -------
177 exists : bool
177 exists : bool
178 Whether the path is indeed a directory.
178 Whether the path is indeed a directory.
179 """
179 """
180 path = path.strip('/')
180 path = path.strip('/')
181 os_path = self._get_os_path(path=path)
181 os_path = self._get_os_path(path=path)
182 return os.path.isdir(os_path)
182 return os.path.isdir(os_path)
183
183
184 def exists(self, path):
184 def exists(self, path):
185 """Returns True if the path exists, else returns False.
185 """Returns True if the path exists, else returns False.
186
186
187 API-style wrapper for os.path.exists
187 API-style wrapper for os.path.exists
188
188
189 Parameters
189 Parameters
190 ----------
190 ----------
191 path : string
191 path : string
192 The API path to the file (with '/' as separator)
192 The API path to the file (with '/' as separator)
193
193
194 Returns
194 Returns
195 -------
195 -------
196 exists : bool
196 exists : bool
197 Whether the target exists.
197 Whether the target exists.
198 """
198 """
199 path = path.strip('/')
199 path = path.strip('/')
200 os_path = self._get_os_path(path=path)
200 os_path = self._get_os_path(path=path)
201 return os.path.exists(os_path)
201 return os.path.exists(os_path)
202
202
203 def _base_model(self, path):
203 def _base_model(self, path):
204 """Build the common base of a contents model"""
204 """Build the common base of a contents model"""
205 os_path = self._get_os_path(path)
205 os_path = self._get_os_path(path)
206 info = os.stat(os_path)
206 info = os.stat(os_path)
207 last_modified = tz.utcfromtimestamp(info.st_mtime)
207 last_modified = tz.utcfromtimestamp(info.st_mtime)
208 created = tz.utcfromtimestamp(info.st_ctime)
208 created = tz.utcfromtimestamp(info.st_ctime)
209 # Create the base model.
209 # Create the base model.
210 model = {}
210 model = {}
211 model['name'] = path.rsplit('/', 1)[-1]
211 model['name'] = path.rsplit('/', 1)[-1]
212 model['path'] = path
212 model['path'] = path
213 model['last_modified'] = last_modified
213 model['last_modified'] = last_modified
214 model['created'] = created
214 model['created'] = created
215 model['content'] = None
215 model['content'] = None
216 model['format'] = None
216 model['format'] = None
217 model['mimetype'] = None
217 model['mimetype'] = None
218 try:
218 try:
219 model['writable'] = os.access(os_path, os.W_OK)
219 model['writable'] = os.access(os_path, os.W_OK)
220 except OSError:
220 except OSError:
221 self.log.error("Failed to check write permissions on %s", os_path)
221 self.log.error("Failed to check write permissions on %s", os_path)
222 model['writable'] = False
222 model['writable'] = False
223 return model
223 return model
224
224
225 def _dir_model(self, path, content=True):
225 def _dir_model(self, path, content=True):
226 """Build a model for a directory
226 """Build a model for a directory
227
227
228 if content is requested, will include a listing of the directory
228 if content is requested, will include a listing of the directory
229 """
229 """
230 os_path = self._get_os_path(path)
230 os_path = self._get_os_path(path)
231
231
232 four_o_four = u'directory does not exist: %r' % path
232 four_o_four = u'directory does not exist: %r' % path
233
233
234 if not os.path.isdir(os_path):
234 if not os.path.isdir(os_path):
235 raise web.HTTPError(404, four_o_four)
235 raise web.HTTPError(404, four_o_four)
236 elif is_hidden(os_path, self.root_dir):
236 elif is_hidden(os_path, self.root_dir):
237 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
237 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
238 os_path
238 os_path
239 )
239 )
240 raise web.HTTPError(404, four_o_four)
240 raise web.HTTPError(404, four_o_four)
241
241
242 model = self._base_model(path)
242 model = self._base_model(path)
243 model['type'] = 'directory'
243 model['type'] = 'directory'
244 if content:
244 if content:
245 model['content'] = contents = []
245 model['content'] = contents = []
246 os_dir = self._get_os_path(path)
246 os_dir = self._get_os_path(path)
247 for name in os.listdir(os_dir):
247 for name in os.listdir(os_dir):
248 os_path = os.path.join(os_dir, name)
248 os_path = os.path.join(os_dir, name)
249 # skip over broken symlinks in listing
249 # skip over broken symlinks in listing
250 if not os.path.exists(os_path):
250 if not os.path.exists(os_path):
251 self.log.warn("%s doesn't exist", os_path)
251 self.log.warn("%s doesn't exist", os_path)
252 continue
252 continue
253 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
253 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
254 self.log.debug("%s not a regular file", os_path)
254 self.log.debug("%s not a regular file", os_path)
255 continue
255 continue
256 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
256 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
257 contents.append(self.get(
257 contents.append(self.get(
258 path='%s/%s' % (path, name),
258 path='%s/%s' % (path, name),
259 content=False)
259 content=False)
260 )
260 )
261
261
262 model['format'] = 'json'
262 model['format'] = 'json'
263
263
264 return model
264 return model
265
265
266 def _file_model(self, path, content=True, format=None):
266 def _file_model(self, path, content=True, format=None):
267 """Build a model for a file
267 """Build a model for a file
268
268
269 if content is requested, include the file contents.
269 if content is requested, include the file contents.
270
270
271 format:
271 format:
272 If 'text', the contents will be decoded as UTF-8.
272 If 'text', the contents will be decoded as UTF-8.
273 If 'base64', the raw bytes contents will be encoded as base64.
273 If 'base64', the raw bytes contents will be encoded as base64.
274 If not specified, try to decode as UTF-8, and fall back to base64
274 If not specified, try to decode as UTF-8, and fall back to base64
275 """
275 """
276 model = self._base_model(path)
276 model = self._base_model(path)
277 model['type'] = 'file'
277 model['type'] = 'file'
278
278
279 os_path = self._get_os_path(path)
279 os_path = self._get_os_path(path)
280
280
281 if content:
281 if content:
282 content, format = self._read_file(os_path, format)
282 content, format = self._read_file(os_path, format)
283 default_mime = {
283 default_mime = {
284 'text': 'text/plain',
284 'text': 'text/plain',
285 'base64': 'application/octet-stream'
285 'base64': 'application/octet-stream'
286 }[format]
286 }[format]
287
287
288 model.update(
288 model.update(
289 content=content,
289 content=content,
290 format=format,
290 format=format,
291 mimetype=mimetypes.guess_type(os_path)[0] or default_mime,
291 mimetype=mimetypes.guess_type(os_path)[0] or default_mime,
292 )
292 )
293
293
294 return model
294 return model
295
295
296 def _notebook_model(self, path, content=True):
296 def _notebook_model(self, path, content=True):
297 """Build a notebook model
297 """Build a notebook model
298
298
299 if content is requested, the notebook content will be populated
299 if content is requested, the notebook content will be populated
300 as a JSON structure (not double-serialized)
300 as a JSON structure (not double-serialized)
301 """
301 """
302 model = self._base_model(path)
302 model = self._base_model(path)
303 model['type'] = 'notebook'
303 model['type'] = 'notebook'
304 if content:
304 if content:
305 os_path = self._get_os_path(path)
305 os_path = self._get_os_path(path)
306 nb = self._read_notebook(os_path, as_version=4)
306 nb = self._read_notebook(os_path, as_version=4)
307 self.mark_trusted_cells(nb, path)
307 self.mark_trusted_cells(nb, path)
308 model['content'] = nb
308 model['content'] = nb
309 model['format'] = 'json'
309 model['format'] = 'json'
310 self.validate_notebook_model(model)
310 self.validate_notebook_model(model)
311 return model
311 return model
312
312
313 def get(self, path, content=True, type=None, format=None):
313 def get(self, path, content=True, type=None, format=None):
314 """ Takes a path for an entity and returns its model
314 """ Takes a path for an entity and returns its model
315
315
316 Parameters
316 Parameters
317 ----------
317 ----------
318 path : str
318 path : str
319 the API path that describes the relative path for the target
319 the API path that describes the relative path for the target
320 content : bool
320 content : bool
321 Whether to include the contents in the reply
321 Whether to include the contents in the reply
322 type : str, optional
322 type : str, optional
323 The requested type - 'file', 'notebook', or 'directory'.
323 The requested type - 'file', 'notebook', or 'directory'.
324 Will raise HTTPError 400 if the content doesn't match.
324 Will raise HTTPError 400 if the content doesn't match.
325 format : str, optional
325 format : str, optional
326 The requested format for file contents. 'text' or 'base64'.
326 The requested format for file contents. 'text' or 'base64'.
327 Ignored if this returns a notebook or directory model.
327 Ignored if this returns a notebook or directory model.
328
328
329 Returns
329 Returns
330 -------
330 -------
331 model : dict
331 model : dict
332 the contents model. If content=True, returns the contents
332 the contents model. If content=True, returns the contents
333 of the file or directory as well.
333 of the file or directory as well.
334 """
334 """
335 path = path.strip('/')
335 path = path.strip('/')
336
336
337 if not self.exists(path):
337 if not self.exists(path):
338 raise web.HTTPError(404, u'No such file or directory: %s' % path)
338 raise web.HTTPError(404, u'No such file or directory: %s' % path)
339
339
340 os_path = self._get_os_path(path)
340 os_path = self._get_os_path(path)
341 if os.path.isdir(os_path):
341 if os.path.isdir(os_path):
342 if type not in (None, 'directory'):
342 if type not in (None, 'directory'):
343 raise web.HTTPError(400,
343 raise web.HTTPError(400,
344 u'%s is a directory, not a %s' % (path, type), reason='bad type')
344 u'%s is a directory, not a %s' % (path, type), reason='bad type')
345 model = self._dir_model(path, content=content)
345 model = self._dir_model(path, content=content)
346 elif type == 'notebook' or (type is None and path.endswith('.ipynb')):
346 elif type == 'notebook' or (type is None and path.endswith('.ipynb')):
347 model = self._notebook_model(path, content=content)
347 model = self._notebook_model(path, content=content)
348 else:
348 else:
349 if type == 'directory':
349 if type == 'directory':
350 raise web.HTTPError(400,
350 raise web.HTTPError(400,
351 u'%s is not a directory' % path, reason='bad type')
351 u'%s is not a directory' % path, reason='bad type')
352 model = self._file_model(path, content=content, format=format)
352 model = self._file_model(path, content=content, format=format)
353 return model
353 return model
354
354
355 def _save_directory(self, os_path, model, path=''):
355 def _save_directory(self, os_path, model, path=''):
356 """create a directory"""
356 """create a directory"""
357 if is_hidden(os_path, self.root_dir):
357 if is_hidden(os_path, self.root_dir):
358 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
358 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
359 if not os.path.exists(os_path):
359 if not os.path.exists(os_path):
360 with self.perm_to_403():
360 with self.perm_to_403():
361 os.mkdir(os_path)
361 os.mkdir(os_path)
362 elif not os.path.isdir(os_path):
362 elif not os.path.isdir(os_path):
363 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
363 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
364 else:
364 else:
365 self.log.debug("Directory %r already exists", os_path)
365 self.log.debug("Directory %r already exists", os_path)
366
366
367 def save(self, model, path=''):
367 def save(self, model, path=''):
368 """Save the file model and return the model with no content."""
368 """Save the file model and return the model with no content."""
369 path = path.strip('/')
369 path = path.strip('/')
370
370
371 if 'type' not in model:
371 if 'type' not in model:
372 raise web.HTTPError(400, u'No file type provided')
372 raise web.HTTPError(400, u'No file type provided')
373 if 'content' not in model and model['type'] != 'directory':
373 if 'content' not in model and model['type'] != 'directory':
374 raise web.HTTPError(400, u'No file content provided')
374 raise web.HTTPError(400, u'No file content provided')
375
375
376 self.run_pre_save_hook(model=model, path=path)
377
378 os_path = self._get_os_path(path)
376 os_path = self._get_os_path(path)
379 self.log.debug("Saving %s", os_path)
377 self.log.debug("Saving %s", os_path)
378
379 self.run_pre_save_hook(model=model, path=path)
380
380 try:
381 try:
381 if model['type'] == 'notebook':
382 if model['type'] == 'notebook':
382 nb = nbformat.from_dict(model['content'])
383 nb = nbformat.from_dict(model['content'])
383 self.check_and_sign(nb, path)
384 self.check_and_sign(nb, path)
384 self._save_notebook(os_path, nb)
385 self._save_notebook(os_path, nb)
385 # One checkpoint should always exist for notebooks.
386 # One checkpoint should always exist for notebooks.
386 if not self.checkpoints.list_checkpoints(path):
387 if not self.checkpoints.list_checkpoints(path):
387 self.create_checkpoint(path)
388 self.create_checkpoint(path)
388 elif model['type'] == 'file':
389 elif model['type'] == 'file':
389 # Missing format will be handled internally by _save_file.
390 # Missing format will be handled internally by _save_file.
390 self._save_file(os_path, model['content'], model.get('format'))
391 self._save_file(os_path, model['content'], model.get('format'))
391 elif model['type'] == 'directory':
392 elif model['type'] == 'directory':
392 self._save_directory(os_path, model, path)
393 self._save_directory(os_path, model, path)
393 else:
394 else:
394 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
395 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
395 except web.HTTPError:
396 except web.HTTPError:
396 raise
397 raise
397 except Exception as e:
398 except Exception as e:
398 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
399 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
399 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
400 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
400
401
401 validation_message = None
402 validation_message = None
402 if model['type'] == 'notebook':
403 if model['type'] == 'notebook':
403 self.validate_notebook_model(model)
404 self.validate_notebook_model(model)
404 validation_message = model.get('message', None)
405 validation_message = model.get('message', None)
405
406
406 model = self.get(path, content=False)
407 model = self.get(path, content=False)
407 if validation_message:
408 if validation_message:
408 model['message'] = validation_message
409 model['message'] = validation_message
409
410
410 self.run_post_save_hook(model=model, os_path=os_path)
411 self.run_post_save_hook(model=model, os_path=os_path)
411
412
412 return model
413 return model
413
414
414 def delete_file(self, path):
415 def delete_file(self, path):
415 """Delete file at path."""
416 """Delete file at path."""
416 path = path.strip('/')
417 path = path.strip('/')
417 os_path = self._get_os_path(path)
418 os_path = self._get_os_path(path)
418 rm = os.unlink
419 rm = os.unlink
419 if os.path.isdir(os_path):
420 if os.path.isdir(os_path):
420 listing = os.listdir(os_path)
421 listing = os.listdir(os_path)
421 # Don't delete non-empty directories.
422 # Don't delete non-empty directories.
422 # A directory containing only leftover checkpoints is
423 # A directory containing only leftover checkpoints is
423 # considered empty.
424 # considered empty.
424 cp_dir = getattr(self.checkpoints, 'checkpoint_dir', None)
425 cp_dir = getattr(self.checkpoints, 'checkpoint_dir', None)
425 for entry in listing:
426 for entry in listing:
426 if entry != cp_dir:
427 if entry != cp_dir:
427 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
428 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
428 elif not os.path.isfile(os_path):
429 elif not os.path.isfile(os_path):
429 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
430 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
430
431
431 if os.path.isdir(os_path):
432 if os.path.isdir(os_path):
432 self.log.debug("Removing directory %s", os_path)
433 self.log.debug("Removing directory %s", os_path)
433 with self.perm_to_403():
434 with self.perm_to_403():
434 shutil.rmtree(os_path)
435 shutil.rmtree(os_path)
435 else:
436 else:
436 self.log.debug("Unlinking file %s", os_path)
437 self.log.debug("Unlinking file %s", os_path)
437 with self.perm_to_403():
438 with self.perm_to_403():
438 rm(os_path)
439 rm(os_path)
439
440
440 def rename_file(self, old_path, new_path):
441 def rename_file(self, old_path, new_path):
441 """Rename a file."""
442 """Rename a file."""
442 old_path = old_path.strip('/')
443 old_path = old_path.strip('/')
443 new_path = new_path.strip('/')
444 new_path = new_path.strip('/')
444 if new_path == old_path:
445 if new_path == old_path:
445 return
446 return
446
447
447 new_os_path = self._get_os_path(new_path)
448 new_os_path = self._get_os_path(new_path)
448 old_os_path = self._get_os_path(old_path)
449 old_os_path = self._get_os_path(old_path)
449
450
450 # Should we proceed with the move?
451 # Should we proceed with the move?
451 if os.path.exists(new_os_path):
452 if os.path.exists(new_os_path):
452 raise web.HTTPError(409, u'File already exists: %s' % new_path)
453 raise web.HTTPError(409, u'File already exists: %s' % new_path)
453
454
454 # Move the file
455 # Move the file
455 try:
456 try:
456 with self.perm_to_403():
457 with self.perm_to_403():
457 shutil.move(old_os_path, new_os_path)
458 shutil.move(old_os_path, new_os_path)
458 except web.HTTPError:
459 except web.HTTPError:
459 raise
460 raise
460 except Exception as e:
461 except Exception as e:
461 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
462 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
462
463
463 def info_string(self):
464 def info_string(self):
464 return "Serving notebooks from local directory: %s" % self.root_dir
465 return "Serving notebooks from local directory: %s" % self.root_dir
465
466
466 def get_kernel_path(self, path, model=None):
467 def get_kernel_path(self, path, model=None):
467 """Return the initial API path of a kernel associated with a given notebook"""
468 """Return the initial API path of a kernel associated with a given notebook"""
468 if '/' in path:
469 if '/' in path:
469 parent_dir = path.rsplit('/', 1)[0]
470 parent_dir = path.rsplit('/', 1)[0]
470 else:
471 else:
471 parent_dir = ''
472 parent_dir = ''
472 return parent_dir
473 return parent_dir
@@ -1,464 +1,500 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import os
5 import os
6 import sys
6 import sys
7 import time
7 import time
8 from contextlib import contextmanager
8
9
9 from nose import SkipTest
10 from nose import SkipTest
10 from tornado.web import HTTPError
11 from tornado.web import HTTPError
11 from unittest import TestCase
12 from unittest import TestCase
12 from tempfile import NamedTemporaryFile
13 from tempfile import NamedTemporaryFile
13
14
14 from IPython.nbformat import v4 as nbformat
15 from IPython.nbformat import v4 as nbformat
15
16
16 from IPython.utils.tempdir import TemporaryDirectory
17 from IPython.utils.tempdir import TemporaryDirectory
17 from IPython.utils.traitlets import TraitError
18 from IPython.utils.traitlets import TraitError
18 from IPython.html.utils import url_path_join
19 from IPython.html.utils import url_path_join
19 from IPython.testing import decorators as dec
20 from IPython.testing import decorators as dec
20
21
21 from ..filemanager import FileContentsManager
22 from ..filemanager import FileContentsManager
22
23
23
24
24 def _make_dir(contents_manager, api_path):
25 def _make_dir(contents_manager, api_path):
25 """
26 """
26 Make a directory.
27 Make a directory.
27 """
28 """
28 os_path = contents_manager._get_os_path(api_path)
29 os_path = contents_manager._get_os_path(api_path)
29 try:
30 try:
30 os.makedirs(os_path)
31 os.makedirs(os_path)
31 except OSError:
32 except OSError:
32 print("Directory already exists: %r" % os_path)
33 print("Directory already exists: %r" % os_path)
33
34
34
35
35 class TestFileContentsManager(TestCase):
36 class TestFileContentsManager(TestCase):
36
37
37 def symlink(self, contents_manager, src, dst):
38 def symlink(self, contents_manager, src, dst):
38 """Make a symlink to src from dst
39 """Make a symlink to src from dst
39
40
40 src and dst are api_paths
41 src and dst are api_paths
41 """
42 """
42 src_os_path = contents_manager._get_os_path(src)
43 src_os_path = contents_manager._get_os_path(src)
43 dst_os_path = contents_manager._get_os_path(dst)
44 dst_os_path = contents_manager._get_os_path(dst)
44 print(src_os_path, dst_os_path, os.path.isfile(src_os_path))
45 print(src_os_path, dst_os_path, os.path.isfile(src_os_path))
45 os.symlink(src_os_path, dst_os_path)
46 os.symlink(src_os_path, dst_os_path)
46
47
47 def test_root_dir(self):
48 def test_root_dir(self):
48 with TemporaryDirectory() as td:
49 with TemporaryDirectory() as td:
49 fm = FileContentsManager(root_dir=td)
50 fm = FileContentsManager(root_dir=td)
50 self.assertEqual(fm.root_dir, td)
51 self.assertEqual(fm.root_dir, td)
51
52
52 def test_missing_root_dir(self):
53 def test_missing_root_dir(self):
53 with TemporaryDirectory() as td:
54 with TemporaryDirectory() as td:
54 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
55 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
55 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
56 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
56
57
57 def test_invalid_root_dir(self):
58 def test_invalid_root_dir(self):
58 with NamedTemporaryFile() as tf:
59 with NamedTemporaryFile() as tf:
59 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
60 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
60
61
61 def test_get_os_path(self):
62 def test_get_os_path(self):
62 # full filesystem path should be returned with correct operating system
63 # full filesystem path should be returned with correct operating system
63 # separators.
64 # separators.
64 with TemporaryDirectory() as td:
65 with TemporaryDirectory() as td:
65 root = td
66 root = td
66 fm = FileContentsManager(root_dir=root)
67 fm = FileContentsManager(root_dir=root)
67 path = fm._get_os_path('/path/to/notebook/test.ipynb')
68 path = fm._get_os_path('/path/to/notebook/test.ipynb')
68 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
69 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
69 fs_path = os.path.join(fm.root_dir, *rel_path_list)
70 fs_path = os.path.join(fm.root_dir, *rel_path_list)
70 self.assertEqual(path, fs_path)
71 self.assertEqual(path, fs_path)
71
72
72 fm = FileContentsManager(root_dir=root)
73 fm = FileContentsManager(root_dir=root)
73 path = fm._get_os_path('test.ipynb')
74 path = fm._get_os_path('test.ipynb')
74 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
75 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
75 self.assertEqual(path, fs_path)
76 self.assertEqual(path, fs_path)
76
77
77 fm = FileContentsManager(root_dir=root)
78 fm = FileContentsManager(root_dir=root)
78 path = fm._get_os_path('////test.ipynb')
79 path = fm._get_os_path('////test.ipynb')
79 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
80 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
80 self.assertEqual(path, fs_path)
81 self.assertEqual(path, fs_path)
81
82
82 def test_checkpoint_subdir(self):
83 def test_checkpoint_subdir(self):
83 subd = u'sub βˆ‚ir'
84 subd = u'sub βˆ‚ir'
84 cp_name = 'test-cp.ipynb'
85 cp_name = 'test-cp.ipynb'
85 with TemporaryDirectory() as td:
86 with TemporaryDirectory() as td:
86 root = td
87 root = td
87 os.mkdir(os.path.join(td, subd))
88 os.mkdir(os.path.join(td, subd))
88 fm = FileContentsManager(root_dir=root)
89 fm = FileContentsManager(root_dir=root)
89 cpm = fm.checkpoints
90 cpm = fm.checkpoints
90 cp_dir = cpm.checkpoint_path(
91 cp_dir = cpm.checkpoint_path(
91 'cp', 'test.ipynb'
92 'cp', 'test.ipynb'
92 )
93 )
93 cp_subdir = cpm.checkpoint_path(
94 cp_subdir = cpm.checkpoint_path(
94 'cp', '/%s/test.ipynb' % subd
95 'cp', '/%s/test.ipynb' % subd
95 )
96 )
96 self.assertNotEqual(cp_dir, cp_subdir)
97 self.assertNotEqual(cp_dir, cp_subdir)
97 self.assertEqual(cp_dir, os.path.join(root, cpm.checkpoint_dir, cp_name))
98 self.assertEqual(cp_dir, os.path.join(root, cpm.checkpoint_dir, cp_name))
98 self.assertEqual(cp_subdir, os.path.join(root, subd, cpm.checkpoint_dir, cp_name))
99 self.assertEqual(cp_subdir, os.path.join(root, subd, cpm.checkpoint_dir, cp_name))
99
100
100 @dec.skip_win32
101 @dec.skip_win32
101 def test_bad_symlink(self):
102 def test_bad_symlink(self):
102 with TemporaryDirectory() as td:
103 with TemporaryDirectory() as td:
103 cm = FileContentsManager(root_dir=td)
104 cm = FileContentsManager(root_dir=td)
104 path = 'test bad symlink'
105 path = 'test bad symlink'
105 _make_dir(cm, path)
106 _make_dir(cm, path)
106
107
107 file_model = cm.new_untitled(path=path, ext='.txt')
108 file_model = cm.new_untitled(path=path, ext='.txt')
108
109
109 # create a broken symlink
110 # create a broken symlink
110 self.symlink(cm, "target", '%s/%s' % (path, 'bad symlink'))
111 self.symlink(cm, "target", '%s/%s' % (path, 'bad symlink'))
111 model = cm.get(path)
112 model = cm.get(path)
112 self.assertEqual(model['content'], [file_model])
113 self.assertEqual(model['content'], [file_model])
113
114
114 @dec.skip_win32
115 @dec.skip_win32
115 def test_good_symlink(self):
116 def test_good_symlink(self):
116 with TemporaryDirectory() as td:
117 with TemporaryDirectory() as td:
117 cm = FileContentsManager(root_dir=td)
118 cm = FileContentsManager(root_dir=td)
118 parent = 'test good symlink'
119 parent = 'test good symlink'
119 name = 'good symlink'
120 name = 'good symlink'
120 path = '{0}/{1}'.format(parent, name)
121 path = '{0}/{1}'.format(parent, name)
121 _make_dir(cm, parent)
122 _make_dir(cm, parent)
122
123
123 file_model = cm.new(path=parent + '/zfoo.txt')
124 file_model = cm.new(path=parent + '/zfoo.txt')
124
125
125 # create a good symlink
126 # create a good symlink
126 self.symlink(cm, file_model['path'], path)
127 self.symlink(cm, file_model['path'], path)
127 symlink_model = cm.get(path, content=False)
128 symlink_model = cm.get(path, content=False)
128 dir_model = cm.get(parent)
129 dir_model = cm.get(parent)
129 self.assertEqual(
130 self.assertEqual(
130 sorted(dir_model['content'], key=lambda x: x['name']),
131 sorted(dir_model['content'], key=lambda x: x['name']),
131 [symlink_model, file_model],
132 [symlink_model, file_model],
132 )
133 )
133
134
134 def test_403(self):
135 def test_403(self):
135 if hasattr(os, 'getuid'):
136 if hasattr(os, 'getuid'):
136 if os.getuid() == 0:
137 if os.getuid() == 0:
137 raise SkipTest("Can't test permissions as root")
138 raise SkipTest("Can't test permissions as root")
138 if sys.platform.startswith('win'):
139 if sys.platform.startswith('win'):
139 raise SkipTest("Can't test permissions on Windows")
140 raise SkipTest("Can't test permissions on Windows")
140
141
141 with TemporaryDirectory() as td:
142 with TemporaryDirectory() as td:
142 cm = FileContentsManager(root_dir=td)
143 cm = FileContentsManager(root_dir=td)
143 model = cm.new_untitled(type='file')
144 model = cm.new_untitled(type='file')
144 os_path = cm._get_os_path(model['path'])
145 os_path = cm._get_os_path(model['path'])
145
146
146 os.chmod(os_path, 0o400)
147 os.chmod(os_path, 0o400)
147 try:
148 try:
148 with cm.open(os_path, 'w') as f:
149 with cm.open(os_path, 'w') as f:
149 f.write(u"don't care")
150 f.write(u"don't care")
150 except HTTPError as e:
151 except HTTPError as e:
151 self.assertEqual(e.status_code, 403)
152 self.assertEqual(e.status_code, 403)
152 else:
153 else:
153 self.fail("Should have raised HTTPError(403)")
154 self.fail("Should have raised HTTPError(403)")
154
155
155
156
156 class TestContentsManager(TestCase):
157 class TestContentsManager(TestCase):
157
158
158 def setUp(self):
159 def setUp(self):
159 self._temp_dir = TemporaryDirectory()
160 self._temp_dir = TemporaryDirectory()
160 self.td = self._temp_dir.name
161 self.td = self._temp_dir.name
161 self.contents_manager = FileContentsManager(
162 self.contents_manager = FileContentsManager(
162 root_dir=self.td,
163 root_dir=self.td,
163 )
164 )
164
165
165 def tearDown(self):
166 def tearDown(self):
166 self._temp_dir.cleanup()
167 self._temp_dir.cleanup()
167
168
169 @contextmanager
170 def assertRaisesHTTPError(self, status, msg=None):
171 msg = msg or "Should have raised HTTPError(%i)" % status
172 try:
173 yield
174 except HTTPError as e:
175 self.assertEqual(e.status_code, status)
176 else:
177 self.fail(msg)
178
168 def make_dir(self, api_path):
179 def make_dir(self, api_path):
169 """make a subdirectory at api_path
180 """make a subdirectory at api_path
170
181
171 override in subclasses if contents are not on the filesystem.
182 override in subclasses if contents are not on the filesystem.
172 """
183 """
173 _make_dir(self.contents_manager, api_path)
184 _make_dir(self.contents_manager, api_path)
174
185
175 def add_code_cell(self, nb):
186 def add_code_cell(self, nb):
176 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
187 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
177 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
188 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
178 nb.cells.append(cell)
189 nb.cells.append(cell)
179
190
180 def new_notebook(self):
191 def new_notebook(self):
181 cm = self.contents_manager
192 cm = self.contents_manager
182 model = cm.new_untitled(type='notebook')
193 model = cm.new_untitled(type='notebook')
183 name = model['name']
194 name = model['name']
184 path = model['path']
195 path = model['path']
185
196
186 full_model = cm.get(path)
197 full_model = cm.get(path)
187 nb = full_model['content']
198 nb = full_model['content']
188 nb['metadata']['counter'] = int(1e6 * time.time())
199 nb['metadata']['counter'] = int(1e6 * time.time())
189 self.add_code_cell(nb)
200 self.add_code_cell(nb)
190
201
191 cm.save(full_model, path)
202 cm.save(full_model, path)
192 return nb, name, path
203 return nb, name, path
193
204
194 def test_new_untitled(self):
205 def test_new_untitled(self):
195 cm = self.contents_manager
206 cm = self.contents_manager
196 # Test in root directory
207 # Test in root directory
197 model = cm.new_untitled(type='notebook')
208 model = cm.new_untitled(type='notebook')
198 assert isinstance(model, dict)
209 assert isinstance(model, dict)
199 self.assertIn('name', model)
210 self.assertIn('name', model)
200 self.assertIn('path', model)
211 self.assertIn('path', model)
201 self.assertIn('type', model)
212 self.assertIn('type', model)
202 self.assertEqual(model['type'], 'notebook')
213 self.assertEqual(model['type'], 'notebook')
203 self.assertEqual(model['name'], 'Untitled.ipynb')
214 self.assertEqual(model['name'], 'Untitled.ipynb')
204 self.assertEqual(model['path'], 'Untitled.ipynb')
215 self.assertEqual(model['path'], 'Untitled.ipynb')
205
216
206 # Test in sub-directory
217 # Test in sub-directory
207 model = cm.new_untitled(type='directory')
218 model = cm.new_untitled(type='directory')
208 assert isinstance(model, dict)
219 assert isinstance(model, dict)
209 self.assertIn('name', model)
220 self.assertIn('name', model)
210 self.assertIn('path', model)
221 self.assertIn('path', model)
211 self.assertIn('type', model)
222 self.assertIn('type', model)
212 self.assertEqual(model['type'], 'directory')
223 self.assertEqual(model['type'], 'directory')
213 self.assertEqual(model['name'], 'Untitled Folder')
224 self.assertEqual(model['name'], 'Untitled Folder')
214 self.assertEqual(model['path'], 'Untitled Folder')
225 self.assertEqual(model['path'], 'Untitled Folder')
215 sub_dir = model['path']
226 sub_dir = model['path']
216
227
217 model = cm.new_untitled(path=sub_dir)
228 model = cm.new_untitled(path=sub_dir)
218 assert isinstance(model, dict)
229 assert isinstance(model, dict)
219 self.assertIn('name', model)
230 self.assertIn('name', model)
220 self.assertIn('path', model)
231 self.assertIn('path', model)
221 self.assertIn('type', model)
232 self.assertIn('type', model)
222 self.assertEqual(model['type'], 'file')
233 self.assertEqual(model['type'], 'file')
223 self.assertEqual(model['name'], 'untitled')
234 self.assertEqual(model['name'], 'untitled')
224 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
235 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
225
236
226 def test_get(self):
237 def test_get(self):
227 cm = self.contents_manager
238 cm = self.contents_manager
228 # Create a notebook
239 # Create a notebook
229 model = cm.new_untitled(type='notebook')
240 model = cm.new_untitled(type='notebook')
230 name = model['name']
241 name = model['name']
231 path = model['path']
242 path = model['path']
232
243
233 # Check that we 'get' on the notebook we just created
244 # Check that we 'get' on the notebook we just created
234 model2 = cm.get(path)
245 model2 = cm.get(path)
235 assert isinstance(model2, dict)
246 assert isinstance(model2, dict)
236 self.assertIn('name', model2)
247 self.assertIn('name', model2)
237 self.assertIn('path', model2)
248 self.assertIn('path', model2)
238 self.assertEqual(model['name'], name)
249 self.assertEqual(model['name'], name)
239 self.assertEqual(model['path'], path)
250 self.assertEqual(model['path'], path)
240
251
241 nb_as_file = cm.get(path, content=True, type='file')
252 nb_as_file = cm.get(path, content=True, type='file')
242 self.assertEqual(nb_as_file['path'], path)
253 self.assertEqual(nb_as_file['path'], path)
243 self.assertEqual(nb_as_file['type'], 'file')
254 self.assertEqual(nb_as_file['type'], 'file')
244 self.assertEqual(nb_as_file['format'], 'text')
255 self.assertEqual(nb_as_file['format'], 'text')
245 self.assertNotIsInstance(nb_as_file['content'], dict)
256 self.assertNotIsInstance(nb_as_file['content'], dict)
246
257
247 nb_as_bin_file = cm.get(path, content=True, type='file', format='base64')
258 nb_as_bin_file = cm.get(path, content=True, type='file', format='base64')
248 self.assertEqual(nb_as_bin_file['format'], 'base64')
259 self.assertEqual(nb_as_bin_file['format'], 'base64')
249
260
250 # Test in sub-directory
261 # Test in sub-directory
251 sub_dir = '/foo/'
262 sub_dir = '/foo/'
252 self.make_dir('foo')
263 self.make_dir('foo')
253 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
264 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
254 model2 = cm.get(sub_dir + name)
265 model2 = cm.get(sub_dir + name)
255 assert isinstance(model2, dict)
266 assert isinstance(model2, dict)
256 self.assertIn('name', model2)
267 self.assertIn('name', model2)
257 self.assertIn('path', model2)
268 self.assertIn('path', model2)
258 self.assertIn('content', model2)
269 self.assertIn('content', model2)
259 self.assertEqual(model2['name'], 'Untitled.ipynb')
270 self.assertEqual(model2['name'], 'Untitled.ipynb')
260 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
271 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
261
272
262 # Test with a regular file.
273 # Test with a regular file.
263 file_model_path = cm.new_untitled(path=sub_dir, ext='.txt')['path']
274 file_model_path = cm.new_untitled(path=sub_dir, ext='.txt')['path']
264 file_model = cm.get(file_model_path)
275 file_model = cm.get(file_model_path)
265 self.assertDictContainsSubset(
276 self.assertDictContainsSubset(
266 {
277 {
267 'content': u'',
278 'content': u'',
268 'format': u'text',
279 'format': u'text',
269 'mimetype': u'text/plain',
280 'mimetype': u'text/plain',
270 'name': u'untitled.txt',
281 'name': u'untitled.txt',
271 'path': u'foo/untitled.txt',
282 'path': u'foo/untitled.txt',
272 'type': u'file',
283 'type': u'file',
273 'writable': True,
284 'writable': True,
274 },
285 },
275 file_model,
286 file_model,
276 )
287 )
277 self.assertIn('created', file_model)
288 self.assertIn('created', file_model)
278 self.assertIn('last_modified', file_model)
289 self.assertIn('last_modified', file_model)
279
290
280 # Test getting directory model
291 # Test getting directory model
281
292
282 # Create a sub-sub directory to test getting directory contents with a
293 # Create a sub-sub directory to test getting directory contents with a
283 # subdir.
294 # subdir.
284 self.make_dir('foo/bar')
295 self.make_dir('foo/bar')
285 dirmodel = cm.get('foo')
296 dirmodel = cm.get('foo')
286 self.assertEqual(dirmodel['type'], 'directory')
297 self.assertEqual(dirmodel['type'], 'directory')
287 self.assertIsInstance(dirmodel['content'], list)
298 self.assertIsInstance(dirmodel['content'], list)
288 self.assertEqual(len(dirmodel['content']), 3)
299 self.assertEqual(len(dirmodel['content']), 3)
289 self.assertEqual(dirmodel['path'], 'foo')
300 self.assertEqual(dirmodel['path'], 'foo')
290 self.assertEqual(dirmodel['name'], 'foo')
301 self.assertEqual(dirmodel['name'], 'foo')
291
302
292 # Directory contents should match the contents of each individual entry
303 # Directory contents should match the contents of each individual entry
293 # when requested with content=False.
304 # when requested with content=False.
294 model2_no_content = cm.get(sub_dir + name, content=False)
305 model2_no_content = cm.get(sub_dir + name, content=False)
295 file_model_no_content = cm.get(u'foo/untitled.txt', content=False)
306 file_model_no_content = cm.get(u'foo/untitled.txt', content=False)
296 sub_sub_dir_no_content = cm.get('foo/bar', content=False)
307 sub_sub_dir_no_content = cm.get('foo/bar', content=False)
297 self.assertEqual(sub_sub_dir_no_content['path'], 'foo/bar')
308 self.assertEqual(sub_sub_dir_no_content['path'], 'foo/bar')
298 self.assertEqual(sub_sub_dir_no_content['name'], 'bar')
309 self.assertEqual(sub_sub_dir_no_content['name'], 'bar')
299
310
300 for entry in dirmodel['content']:
311 for entry in dirmodel['content']:
301 # Order isn't guaranteed by the spec, so this is a hacky way of
312 # Order isn't guaranteed by the spec, so this is a hacky way of
302 # verifying that all entries are matched.
313 # verifying that all entries are matched.
303 if entry['path'] == sub_sub_dir_no_content['path']:
314 if entry['path'] == sub_sub_dir_no_content['path']:
304 self.assertEqual(entry, sub_sub_dir_no_content)
315 self.assertEqual(entry, sub_sub_dir_no_content)
305 elif entry['path'] == model2_no_content['path']:
316 elif entry['path'] == model2_no_content['path']:
306 self.assertEqual(entry, model2_no_content)
317 self.assertEqual(entry, model2_no_content)
307 elif entry['path'] == file_model_no_content['path']:
318 elif entry['path'] == file_model_no_content['path']:
308 self.assertEqual(entry, file_model_no_content)
319 self.assertEqual(entry, file_model_no_content)
309 else:
320 else:
310 self.fail("Unexpected directory entry: %s" % entry())
321 self.fail("Unexpected directory entry: %s" % entry())
311
322
312 with self.assertRaises(HTTPError):
323 with self.assertRaises(HTTPError):
313 cm.get('foo', type='file')
324 cm.get('foo', type='file')
314
325
315 def test_update(self):
326 def test_update(self):
316 cm = self.contents_manager
327 cm = self.contents_manager
317 # Create a notebook
328 # Create a notebook
318 model = cm.new_untitled(type='notebook')
329 model = cm.new_untitled(type='notebook')
319 name = model['name']
330 name = model['name']
320 path = model['path']
331 path = model['path']
321
332
322 # Change the name in the model for rename
333 # Change the name in the model for rename
323 model['path'] = 'test.ipynb'
334 model['path'] = 'test.ipynb'
324 model = cm.update(model, path)
335 model = cm.update(model, path)
325 assert isinstance(model, dict)
336 assert isinstance(model, dict)
326 self.assertIn('name', model)
337 self.assertIn('name', model)
327 self.assertIn('path', model)
338 self.assertIn('path', model)
328 self.assertEqual(model['name'], 'test.ipynb')
339 self.assertEqual(model['name'], 'test.ipynb')
329
340
330 # Make sure the old name is gone
341 # Make sure the old name is gone
331 self.assertRaises(HTTPError, cm.get, path)
342 self.assertRaises(HTTPError, cm.get, path)
332
343
333 # Test in sub-directory
344 # Test in sub-directory
334 # Create a directory and notebook in that directory
345 # Create a directory and notebook in that directory
335 sub_dir = '/foo/'
346 sub_dir = '/foo/'
336 self.make_dir('foo')
347 self.make_dir('foo')
337 model = cm.new_untitled(path=sub_dir, type='notebook')
348 model = cm.new_untitled(path=sub_dir, type='notebook')
338 path = model['path']
349 path = model['path']
339
350
340 # Change the name in the model for rename
351 # Change the name in the model for rename
341 d = path.rsplit('/', 1)[0]
352 d = path.rsplit('/', 1)[0]
342 new_path = model['path'] = d + '/test_in_sub.ipynb'
353 new_path = model['path'] = d + '/test_in_sub.ipynb'
343 model = cm.update(model, path)
354 model = cm.update(model, path)
344 assert isinstance(model, dict)
355 assert isinstance(model, dict)
345 self.assertIn('name', model)
356 self.assertIn('name', model)
346 self.assertIn('path', model)
357 self.assertIn('path', model)
347 self.assertEqual(model['name'], 'test_in_sub.ipynb')
358 self.assertEqual(model['name'], 'test_in_sub.ipynb')
348 self.assertEqual(model['path'], new_path)
359 self.assertEqual(model['path'], new_path)
349
360
350 # Make sure the old name is gone
361 # Make sure the old name is gone
351 self.assertRaises(HTTPError, cm.get, path)
362 self.assertRaises(HTTPError, cm.get, path)
352
363
353 def test_save(self):
364 def test_save(self):
354 cm = self.contents_manager
365 cm = self.contents_manager
355 # Create a notebook
366 # Create a notebook
356 model = cm.new_untitled(type='notebook')
367 model = cm.new_untitled(type='notebook')
357 name = model['name']
368 name = model['name']
358 path = model['path']
369 path = model['path']
359
370
360 # Get the model with 'content'
371 # Get the model with 'content'
361 full_model = cm.get(path)
372 full_model = cm.get(path)
362
373
363 # Save the notebook
374 # Save the notebook
364 model = cm.save(full_model, path)
375 model = cm.save(full_model, path)
365 assert isinstance(model, dict)
376 assert isinstance(model, dict)
366 self.assertIn('name', model)
377 self.assertIn('name', model)
367 self.assertIn('path', model)
378 self.assertIn('path', model)
368 self.assertEqual(model['name'], name)
379 self.assertEqual(model['name'], name)
369 self.assertEqual(model['path'], path)
380 self.assertEqual(model['path'], path)
370
381
371 # Test in sub-directory
382 # Test in sub-directory
372 # Create a directory and notebook in that directory
383 # Create a directory and notebook in that directory
373 sub_dir = '/foo/'
384 sub_dir = '/foo/'
374 self.make_dir('foo')
385 self.make_dir('foo')
375 model = cm.new_untitled(path=sub_dir, type='notebook')
386 model = cm.new_untitled(path=sub_dir, type='notebook')
376 name = model['name']
387 name = model['name']
377 path = model['path']
388 path = model['path']
378 model = cm.get(path)
389 model = cm.get(path)
379
390
380 # Change the name in the model for rename
391 # Change the name in the model for rename
381 model = cm.save(model, path)
392 model = cm.save(model, path)
382 assert isinstance(model, dict)
393 assert isinstance(model, dict)
383 self.assertIn('name', model)
394 self.assertIn('name', model)
384 self.assertIn('path', model)
395 self.assertIn('path', model)
385 self.assertEqual(model['name'], 'Untitled.ipynb')
396 self.assertEqual(model['name'], 'Untitled.ipynb')
386 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
397 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
387
398
388 def test_delete(self):
399 def test_delete(self):
389 cm = self.contents_manager
400 cm = self.contents_manager
390 # Create a notebook
401 # Create a notebook
391 nb, name, path = self.new_notebook()
402 nb, name, path = self.new_notebook()
392
403
393 # Delete the notebook
404 # Delete the notebook
394 cm.delete(path)
405 cm.delete(path)
395
406
396 # Check that deleting a non-existent path raises an error.
407 # Check that deleting a non-existent path raises an error.
397 self.assertRaises(HTTPError, cm.delete, path)
408 self.assertRaises(HTTPError, cm.delete, path)
398
409
399 # Check that a 'get' on the deleted notebook raises and error
410 # Check that a 'get' on the deleted notebook raises and error
400 self.assertRaises(HTTPError, cm.get, path)
411 self.assertRaises(HTTPError, cm.get, path)
401
412
402 def test_copy(self):
413 def test_copy(self):
403 cm = self.contents_manager
414 cm = self.contents_manager
404 parent = u'Γ₯ b'
415 parent = u'Γ₯ b'
405 name = u'nb √.ipynb'
416 name = u'nb √.ipynb'
406 path = u'{0}/{1}'.format(parent, name)
417 path = u'{0}/{1}'.format(parent, name)
407 self.make_dir(parent)
418 self.make_dir(parent)
408
419
409 orig = cm.new(path=path)
420 orig = cm.new(path=path)
410 # copy with unspecified name
421 # copy with unspecified name
411 copy = cm.copy(path)
422 copy = cm.copy(path)
412 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
423 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
413
424
414 # copy with specified name
425 # copy with specified name
415 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
426 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
416 self.assertEqual(copy2['name'], u'copy 2.ipynb')
427 self.assertEqual(copy2['name'], u'copy 2.ipynb')
417 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
428 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
418 # copy with specified path
429 # copy with specified path
419 copy2 = cm.copy(path, u'/')
430 copy2 = cm.copy(path, u'/')
420 self.assertEqual(copy2['name'], name)
431 self.assertEqual(copy2['name'], name)
421 self.assertEqual(copy2['path'], name)
432 self.assertEqual(copy2['path'], name)
422
433
423 def test_trust_notebook(self):
434 def test_trust_notebook(self):
424 cm = self.contents_manager
435 cm = self.contents_manager
425 nb, name, path = self.new_notebook()
436 nb, name, path = self.new_notebook()
426
437
427 untrusted = cm.get(path)['content']
438 untrusted = cm.get(path)['content']
428 assert not cm.notary.check_cells(untrusted)
439 assert not cm.notary.check_cells(untrusted)
429
440
430 # print(untrusted)
441 # print(untrusted)
431 cm.trust_notebook(path)
442 cm.trust_notebook(path)
432 trusted = cm.get(path)['content']
443 trusted = cm.get(path)['content']
433 # print(trusted)
444 # print(trusted)
434 assert cm.notary.check_cells(trusted)
445 assert cm.notary.check_cells(trusted)
435
446
436 def test_mark_trusted_cells(self):
447 def test_mark_trusted_cells(self):
437 cm = self.contents_manager
448 cm = self.contents_manager
438 nb, name, path = self.new_notebook()
449 nb, name, path = self.new_notebook()
439
450
440 cm.mark_trusted_cells(nb, path)
451 cm.mark_trusted_cells(nb, path)
441 for cell in nb.cells:
452 for cell in nb.cells:
442 if cell.cell_type == 'code':
453 if cell.cell_type == 'code':
443 assert not cell.metadata.trusted
454 assert not cell.metadata.trusted
444
455
445 cm.trust_notebook(path)
456 cm.trust_notebook(path)
446 nb = cm.get(path)['content']
457 nb = cm.get(path)['content']
447 for cell in nb.cells:
458 for cell in nb.cells:
448 if cell.cell_type == 'code':
459 if cell.cell_type == 'code':
449 assert cell.metadata.trusted
460 assert cell.metadata.trusted
450
461
451 def test_check_and_sign(self):
462 def test_check_and_sign(self):
452 cm = self.contents_manager
463 cm = self.contents_manager
453 nb, name, path = self.new_notebook()
464 nb, name, path = self.new_notebook()
454
465
455 cm.mark_trusted_cells(nb, path)
466 cm.mark_trusted_cells(nb, path)
456 cm.check_and_sign(nb, path)
467 cm.check_and_sign(nb, path)
457 assert not cm.notary.check_signature(nb)
468 assert not cm.notary.check_signature(nb)
458
469
459 cm.trust_notebook(path)
470 cm.trust_notebook(path)
460 nb = cm.get(path)['content']
471 nb = cm.get(path)['content']
461 cm.mark_trusted_cells(nb, path)
472 cm.mark_trusted_cells(nb, path)
462 cm.check_and_sign(nb, path)
473 cm.check_and_sign(nb, path)
463 assert cm.notary.check_signature(nb)
474 assert cm.notary.check_signature(nb)
475
476 def test_escape_root(self):
477 cm = self.contents_manager
478 # make foo, bar next to root
479 with open(os.path.join(cm.root_dir, '..', 'foo'), 'w') as f:
480 f.write('foo')
481 with open(os.path.join(cm.root_dir, '..', 'bar'), 'w') as f:
482 f.write('bar')
483
484 with self.assertRaisesHTTPError(404):
485 cm.get('..')
486 with self.assertRaisesHTTPError(404):
487 cm.get('foo/../../../bar')
488 with self.assertRaisesHTTPError(404):
489 cm.delete('../foo')
490 with self.assertRaisesHTTPError(404):
491 cm.rename('../foo', '../bar')
492 with self.assertRaisesHTTPError(404):
493 cm.save(model={
494 'type': 'file',
495 'content': u'',
496 'format': 'text',
497 }, path='../foo')
498
499
464
500
General Comments 0
You need to be logged in to leave comments. Login now