##// END OF EJS Templates
ContentsManager type kwarg to match model key...
Min RK -
Show More
@@ -1,622 +1,622 b''
1 """A contents manager that uses the local file system for storage."""
1 """A contents manager that uses the local file system for storage."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6 import base64
7 import errno
7 import errno
8 import io
8 import io
9 import os
9 import os
10 import shutil
10 import shutil
11 from contextlib import contextmanager
11 from contextlib import contextmanager
12 import mimetypes
12 import mimetypes
13
13
14 from tornado import web
14 from tornado import web
15
15
16 from .manager import ContentsManager
16 from .manager import ContentsManager
17 from IPython import nbformat
17 from IPython import nbformat
18 from IPython.utils.io import atomic_writing
18 from IPython.utils.io import atomic_writing
19 from IPython.utils.path import ensure_dir_exists
19 from IPython.utils.path import ensure_dir_exists
20 from IPython.utils.traitlets import Unicode, Bool, TraitError
20 from IPython.utils.traitlets import Unicode, Bool, TraitError
21 from IPython.utils.py3compat import getcwd, str_to_unicode
21 from IPython.utils.py3compat import getcwd, str_to_unicode
22 from IPython.utils import tz
22 from IPython.utils import tz
23 from IPython.html.utils import is_hidden, to_os_path, to_api_path
23 from IPython.html.utils import is_hidden, to_os_path, to_api_path
24
24
25
25
26 class FileContentsManager(ContentsManager):
26 class FileContentsManager(ContentsManager):
27
27
28 root_dir = Unicode(config=True)
28 root_dir = Unicode(config=True)
29
29
30 def _root_dir_default(self):
30 def _root_dir_default(self):
31 try:
31 try:
32 return self.parent.notebook_dir
32 return self.parent.notebook_dir
33 except AttributeError:
33 except AttributeError:
34 return getcwd()
34 return getcwd()
35
35
36 @contextmanager
36 @contextmanager
37 def perm_to_403(self, os_path=''):
37 def perm_to_403(self, os_path=''):
38 """context manager for turning permission errors into 403"""
38 """context manager for turning permission errors into 403"""
39 try:
39 try:
40 yield
40 yield
41 except OSError as e:
41 except OSError as e:
42 if e.errno in {errno.EPERM, errno.EACCES}:
42 if e.errno in {errno.EPERM, errno.EACCES}:
43 # make 403 error message without root prefix
43 # make 403 error message without root prefix
44 # this may not work perfectly on unicode paths on Python 2,
44 # this may not work perfectly on unicode paths on Python 2,
45 # but nobody should be doing that anyway.
45 # but nobody should be doing that anyway.
46 if not os_path:
46 if not os_path:
47 os_path = str_to_unicode(e.filename or 'unknown file')
47 os_path = str_to_unicode(e.filename or 'unknown file')
48 path = to_api_path(os_path, self.root_dir)
48 path = to_api_path(os_path, self.root_dir)
49 raise web.HTTPError(403, u'Permission denied: %s' % path)
49 raise web.HTTPError(403, u'Permission denied: %s' % path)
50 else:
50 else:
51 raise
51 raise
52
52
53 @contextmanager
53 @contextmanager
54 def open(self, os_path, *args, **kwargs):
54 def open(self, os_path, *args, **kwargs):
55 """wrapper around io.open that turns permission errors into 403"""
55 """wrapper around io.open that turns permission errors into 403"""
56 with self.perm_to_403(os_path):
56 with self.perm_to_403(os_path):
57 with io.open(os_path, *args, **kwargs) as f:
57 with io.open(os_path, *args, **kwargs) as f:
58 yield f
58 yield f
59
59
60 @contextmanager
60 @contextmanager
61 def atomic_writing(self, os_path, *args, **kwargs):
61 def atomic_writing(self, os_path, *args, **kwargs):
62 """wrapper around atomic_writing that turns permission errors into 403"""
62 """wrapper around atomic_writing that turns permission errors into 403"""
63 with self.perm_to_403(os_path):
63 with self.perm_to_403(os_path):
64 with atomic_writing(os_path, *args, **kwargs) as f:
64 with atomic_writing(os_path, *args, **kwargs) as f:
65 yield f
65 yield f
66
66
67 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
67 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
68 def _save_script_changed(self):
68 def _save_script_changed(self):
69 self.log.warn("""
69 self.log.warn("""
70 Automatically saving notebooks as scripts has been removed.
70 Automatically saving notebooks as scripts has been removed.
71 Use `ipython nbconvert --to python [notebook]` instead.
71 Use `ipython nbconvert --to python [notebook]` instead.
72 """)
72 """)
73
73
74 def _root_dir_changed(self, name, old, new):
74 def _root_dir_changed(self, name, old, new):
75 """Do a bit of validation of the root_dir."""
75 """Do a bit of validation of the root_dir."""
76 if not os.path.isabs(new):
76 if not os.path.isabs(new):
77 # If we receive a non-absolute path, make it absolute.
77 # If we receive a non-absolute path, make it absolute.
78 self.root_dir = os.path.abspath(new)
78 self.root_dir = os.path.abspath(new)
79 return
79 return
80 if not os.path.isdir(new):
80 if not os.path.isdir(new):
81 raise TraitError("%r is not a directory" % new)
81 raise TraitError("%r is not a directory" % new)
82
82
83 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
83 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
84 help="""The directory name in which to keep file checkpoints
84 help="""The directory name in which to keep file checkpoints
85
85
86 This is a path relative to the file's own directory.
86 This is a path relative to the file's own directory.
87
87
88 By default, it is .ipynb_checkpoints
88 By default, it is .ipynb_checkpoints
89 """
89 """
90 )
90 )
91
91
92 def _copy(self, src, dest):
92 def _copy(self, src, dest):
93 """copy src to dest
93 """copy src to dest
94
94
95 like shutil.copy2, but log errors in copystat
95 like shutil.copy2, but log errors in copystat
96 """
96 """
97 shutil.copyfile(src, dest)
97 shutil.copyfile(src, dest)
98 try:
98 try:
99 shutil.copystat(src, dest)
99 shutil.copystat(src, dest)
100 except OSError as e:
100 except OSError as e:
101 self.log.debug("copystat on %s failed", dest, exc_info=True)
101 self.log.debug("copystat on %s failed", dest, exc_info=True)
102
102
103 def _get_os_path(self, path):
103 def _get_os_path(self, path):
104 """Given an API path, return its file system path.
104 """Given an API path, return its file system path.
105
105
106 Parameters
106 Parameters
107 ----------
107 ----------
108 path : string
108 path : string
109 The relative API path to the named file.
109 The relative API path to the named file.
110
110
111 Returns
111 Returns
112 -------
112 -------
113 path : string
113 path : string
114 Native, absolute OS path to for a file.
114 Native, absolute OS path to for a file.
115 """
115 """
116 return to_os_path(path, self.root_dir)
116 return to_os_path(path, self.root_dir)
117
117
118 def dir_exists(self, path):
118 def dir_exists(self, path):
119 """Does the API-style path refer to an extant directory?
119 """Does the API-style path refer to an extant directory?
120
120
121 API-style wrapper for os.path.isdir
121 API-style wrapper for os.path.isdir
122
122
123 Parameters
123 Parameters
124 ----------
124 ----------
125 path : string
125 path : string
126 The path to check. This is an API path (`/` separated,
126 The path to check. This is an API path (`/` separated,
127 relative to root_dir).
127 relative to root_dir).
128
128
129 Returns
129 Returns
130 -------
130 -------
131 exists : bool
131 exists : bool
132 Whether the path is indeed a directory.
132 Whether the path is indeed a directory.
133 """
133 """
134 path = path.strip('/')
134 path = path.strip('/')
135 os_path = self._get_os_path(path=path)
135 os_path = self._get_os_path(path=path)
136 return os.path.isdir(os_path)
136 return os.path.isdir(os_path)
137
137
138 def is_hidden(self, path):
138 def is_hidden(self, path):
139 """Does the API style path correspond to a hidden directory or file?
139 """Does the API style path correspond to a hidden directory or file?
140
140
141 Parameters
141 Parameters
142 ----------
142 ----------
143 path : string
143 path : string
144 The path to check. This is an API path (`/` separated,
144 The path to check. This is an API path (`/` separated,
145 relative to root_dir).
145 relative to root_dir).
146
146
147 Returns
147 Returns
148 -------
148 -------
149 hidden : bool
149 hidden : bool
150 Whether the path exists and is hidden.
150 Whether the path exists and is hidden.
151 """
151 """
152 path = path.strip('/')
152 path = path.strip('/')
153 os_path = self._get_os_path(path=path)
153 os_path = self._get_os_path(path=path)
154 return is_hidden(os_path, self.root_dir)
154 return is_hidden(os_path, self.root_dir)
155
155
156 def file_exists(self, path):
156 def file_exists(self, path):
157 """Returns True if the file exists, else returns False.
157 """Returns True if the file exists, else returns False.
158
158
159 API-style wrapper for os.path.isfile
159 API-style wrapper for os.path.isfile
160
160
161 Parameters
161 Parameters
162 ----------
162 ----------
163 path : string
163 path : string
164 The relative path to the file (with '/' as separator)
164 The relative path to the file (with '/' as separator)
165
165
166 Returns
166 Returns
167 -------
167 -------
168 exists : bool
168 exists : bool
169 Whether the file exists.
169 Whether the file exists.
170 """
170 """
171 path = path.strip('/')
171 path = path.strip('/')
172 os_path = self._get_os_path(path)
172 os_path = self._get_os_path(path)
173 return os.path.isfile(os_path)
173 return os.path.isfile(os_path)
174
174
175 def exists(self, path):
175 def exists(self, path):
176 """Returns True if the path exists, else returns False.
176 """Returns True if the path exists, else returns False.
177
177
178 API-style wrapper for os.path.exists
178 API-style wrapper for os.path.exists
179
179
180 Parameters
180 Parameters
181 ----------
181 ----------
182 path : string
182 path : string
183 The API path to the file (with '/' as separator)
183 The API path to the file (with '/' as separator)
184
184
185 Returns
185 Returns
186 -------
186 -------
187 exists : bool
187 exists : bool
188 Whether the target exists.
188 Whether the target exists.
189 """
189 """
190 path = path.strip('/')
190 path = path.strip('/')
191 os_path = self._get_os_path(path=path)
191 os_path = self._get_os_path(path=path)
192 return os.path.exists(os_path)
192 return os.path.exists(os_path)
193
193
194 def _base_model(self, path):
194 def _base_model(self, path):
195 """Build the common base of a contents model"""
195 """Build the common base of a contents model"""
196 os_path = self._get_os_path(path)
196 os_path = self._get_os_path(path)
197 info = os.stat(os_path)
197 info = os.stat(os_path)
198 last_modified = tz.utcfromtimestamp(info.st_mtime)
198 last_modified = tz.utcfromtimestamp(info.st_mtime)
199 created = tz.utcfromtimestamp(info.st_ctime)
199 created = tz.utcfromtimestamp(info.st_ctime)
200 # Create the base model.
200 # Create the base model.
201 model = {}
201 model = {}
202 model['name'] = path.rsplit('/', 1)[-1]
202 model['name'] = path.rsplit('/', 1)[-1]
203 model['path'] = path
203 model['path'] = path
204 model['last_modified'] = last_modified
204 model['last_modified'] = last_modified
205 model['created'] = created
205 model['created'] = created
206 model['content'] = None
206 model['content'] = None
207 model['format'] = None
207 model['format'] = None
208 model['mimetype'] = None
208 model['mimetype'] = None
209 try:
209 try:
210 model['writable'] = os.access(os_path, os.W_OK)
210 model['writable'] = os.access(os_path, os.W_OK)
211 except OSError:
211 except OSError:
212 self.log.error("Failed to check write permissions on %s", os_path)
212 self.log.error("Failed to check write permissions on %s", os_path)
213 model['writable'] = False
213 model['writable'] = False
214 return model
214 return model
215
215
216 def _dir_model(self, path, content=True):
216 def _dir_model(self, path, content=True):
217 """Build a model for a directory
217 """Build a model for a directory
218
218
219 if content is requested, will include a listing of the directory
219 if content is requested, will include a listing of the directory
220 """
220 """
221 os_path = self._get_os_path(path)
221 os_path = self._get_os_path(path)
222
222
223 four_o_four = u'directory does not exist: %r' % path
223 four_o_four = u'directory does not exist: %r' % path
224
224
225 if not os.path.isdir(os_path):
225 if not os.path.isdir(os_path):
226 raise web.HTTPError(404, four_o_four)
226 raise web.HTTPError(404, four_o_four)
227 elif is_hidden(os_path, self.root_dir):
227 elif is_hidden(os_path, self.root_dir):
228 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
228 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
229 os_path
229 os_path
230 )
230 )
231 raise web.HTTPError(404, four_o_four)
231 raise web.HTTPError(404, four_o_four)
232
232
233 model = self._base_model(path)
233 model = self._base_model(path)
234 model['type'] = 'directory'
234 model['type'] = 'directory'
235 if content:
235 if content:
236 model['content'] = contents = []
236 model['content'] = contents = []
237 os_dir = self._get_os_path(path)
237 os_dir = self._get_os_path(path)
238 for name in os.listdir(os_dir):
238 for name in os.listdir(os_dir):
239 os_path = os.path.join(os_dir, name)
239 os_path = os.path.join(os_dir, name)
240 # skip over broken symlinks in listing
240 # skip over broken symlinks in listing
241 if not os.path.exists(os_path):
241 if not os.path.exists(os_path):
242 self.log.warn("%s doesn't exist", os_path)
242 self.log.warn("%s doesn't exist", os_path)
243 continue
243 continue
244 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
244 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
245 self.log.debug("%s not a regular file", os_path)
245 self.log.debug("%s not a regular file", os_path)
246 continue
246 continue
247 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
247 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
248 contents.append(self.get(
248 contents.append(self.get(
249 path='%s/%s' % (path, name),
249 path='%s/%s' % (path, name),
250 content=False)
250 content=False)
251 )
251 )
252
252
253 model['format'] = 'json'
253 model['format'] = 'json'
254
254
255 return model
255 return model
256
256
257 def _file_model(self, path, content=True, format=None):
257 def _file_model(self, path, content=True, format=None):
258 """Build a model for a file
258 """Build a model for a file
259
259
260 if content is requested, include the file contents.
260 if content is requested, include the file contents.
261
261
262 format:
262 format:
263 If 'text', the contents will be decoded as UTF-8.
263 If 'text', the contents will be decoded as UTF-8.
264 If 'base64', the raw bytes contents will be encoded as base64.
264 If 'base64', the raw bytes contents will be encoded as base64.
265 If not specified, try to decode as UTF-8, and fall back to base64
265 If not specified, try to decode as UTF-8, and fall back to base64
266 """
266 """
267 model = self._base_model(path)
267 model = self._base_model(path)
268 model['type'] = 'file'
268 model['type'] = 'file'
269
269
270 os_path = self._get_os_path(path)
270 os_path = self._get_os_path(path)
271
271
272 if content:
272 if content:
273 if not os.path.isfile(os_path):
273 if not os.path.isfile(os_path):
274 # could be FIFO
274 # could be FIFO
275 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
275 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
276 with self.open(os_path, 'rb') as f:
276 with self.open(os_path, 'rb') as f:
277 bcontent = f.read()
277 bcontent = f.read()
278
278
279 if format != 'base64':
279 if format != 'base64':
280 try:
280 try:
281 model['content'] = bcontent.decode('utf8')
281 model['content'] = bcontent.decode('utf8')
282 except UnicodeError as e:
282 except UnicodeError as e:
283 if format == 'text':
283 if format == 'text':
284 raise web.HTTPError(400, "%s is not UTF-8 encoded" % path, reason='bad format')
284 raise web.HTTPError(400, "%s is not UTF-8 encoded" % path, reason='bad format')
285 else:
285 else:
286 model['format'] = 'text'
286 model['format'] = 'text'
287 default_mime = 'text/plain'
287 default_mime = 'text/plain'
288
288
289 if model['content'] is None:
289 if model['content'] is None:
290 model['content'] = base64.encodestring(bcontent).decode('ascii')
290 model['content'] = base64.encodestring(bcontent).decode('ascii')
291 model['format'] = 'base64'
291 model['format'] = 'base64'
292 default_mime = 'application/octet-stream'
292 default_mime = 'application/octet-stream'
293
293
294 model['mimetype'] = mimetypes.guess_type(os_path)[0] or default_mime
294 model['mimetype'] = mimetypes.guess_type(os_path)[0] or default_mime
295
295
296 return model
296 return model
297
297
298
298
299 def _notebook_model(self, path, content=True):
299 def _notebook_model(self, path, content=True):
300 """Build a notebook model
300 """Build a notebook model
301
301
302 if content is requested, the notebook content will be populated
302 if content is requested, the notebook content will be populated
303 as a JSON structure (not double-serialized)
303 as a JSON structure (not double-serialized)
304 """
304 """
305 model = self._base_model(path)
305 model = self._base_model(path)
306 model['type'] = 'notebook'
306 model['type'] = 'notebook'
307 if content:
307 if content:
308 os_path = self._get_os_path(path)
308 os_path = self._get_os_path(path)
309 with self.open(os_path, 'r', encoding='utf-8') as f:
309 with self.open(os_path, 'r', encoding='utf-8') as f:
310 try:
310 try:
311 nb = nbformat.read(f, as_version=4)
311 nb = nbformat.read(f, as_version=4)
312 except Exception as e:
312 except Exception as e:
313 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
313 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
314 self.mark_trusted_cells(nb, path)
314 self.mark_trusted_cells(nb, path)
315 model['content'] = nb
315 model['content'] = nb
316 model['format'] = 'json'
316 model['format'] = 'json'
317 self.validate_notebook_model(model)
317 self.validate_notebook_model(model)
318 return model
318 return model
319
319
320 def get(self, path, content=True, type_=None, format=None):
320 def get(self, path, content=True, type=None, format=None):
321 """ Takes a path for an entity and returns its model
321 """ Takes a path for an entity and returns its model
322
322
323 Parameters
323 Parameters
324 ----------
324 ----------
325 path : str
325 path : str
326 the API path that describes the relative path for the target
326 the API path that describes the relative path for the target
327 content : bool
327 content : bool
328 Whether to include the contents in the reply
328 Whether to include the contents in the reply
329 type_ : str, optional
329 type : str, optional
330 The requested type - 'file', 'notebook', or 'directory'.
330 The requested type - 'file', 'notebook', or 'directory'.
331 Will raise HTTPError 400 if the content doesn't match.
331 Will raise HTTPError 400 if the content doesn't match.
332 format : str, optional
332 format : str, optional
333 The requested format for file contents. 'text' or 'base64'.
333 The requested format for file contents. 'text' or 'base64'.
334 Ignored if this returns a notebook or directory model.
334 Ignored if this returns a notebook or directory model.
335
335
336 Returns
336 Returns
337 -------
337 -------
338 model : dict
338 model : dict
339 the contents model. If content=True, returns the contents
339 the contents model. If content=True, returns the contents
340 of the file or directory as well.
340 of the file or directory as well.
341 """
341 """
342 path = path.strip('/')
342 path = path.strip('/')
343
343
344 if not self.exists(path):
344 if not self.exists(path):
345 raise web.HTTPError(404, u'No such file or directory: %s' % path)
345 raise web.HTTPError(404, u'No such file or directory: %s' % path)
346
346
347 os_path = self._get_os_path(path)
347 os_path = self._get_os_path(path)
348 if os.path.isdir(os_path):
348 if os.path.isdir(os_path):
349 if type_ not in (None, 'directory'):
349 if type not in (None, 'directory'):
350 raise web.HTTPError(400,
350 raise web.HTTPError(400,
351 u'%s is a directory, not a %s' % (path, type_), reason='bad type')
351 u'%s is a directory, not a %s' % (path, type), reason='bad type')
352 model = self._dir_model(path, content=content)
352 model = self._dir_model(path, content=content)
353 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
353 elif type == 'notebook' or (type is None and path.endswith('.ipynb')):
354 model = self._notebook_model(path, content=content)
354 model = self._notebook_model(path, content=content)
355 else:
355 else:
356 if type_ == 'directory':
356 if type == 'directory':
357 raise web.HTTPError(400,
357 raise web.HTTPError(400,
358 u'%s is not a directory', reason='bad type')
358 u'%s is not a directory', reason='bad type')
359 model = self._file_model(path, content=content, format=format)
359 model = self._file_model(path, content=content, format=format)
360 return model
360 return model
361
361
362 def _save_notebook(self, os_path, model, path=''):
362 def _save_notebook(self, os_path, model, path=''):
363 """save a notebook file"""
363 """save a notebook file"""
364 # Save the notebook file
364 # Save the notebook file
365 nb = nbformat.from_dict(model['content'])
365 nb = nbformat.from_dict(model['content'])
366
366
367 self.check_and_sign(nb, path)
367 self.check_and_sign(nb, path)
368
368
369 with self.atomic_writing(os_path, encoding='utf-8') as f:
369 with self.atomic_writing(os_path, encoding='utf-8') as f:
370 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
370 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
371
371
372 def _save_file(self, os_path, model, path=''):
372 def _save_file(self, os_path, model, path=''):
373 """save a non-notebook file"""
373 """save a non-notebook file"""
374 fmt = model.get('format', None)
374 fmt = model.get('format', None)
375 if fmt not in {'text', 'base64'}:
375 if fmt not in {'text', 'base64'}:
376 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
376 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
377 try:
377 try:
378 content = model['content']
378 content = model['content']
379 if fmt == 'text':
379 if fmt == 'text':
380 bcontent = content.encode('utf8')
380 bcontent = content.encode('utf8')
381 else:
381 else:
382 b64_bytes = content.encode('ascii')
382 b64_bytes = content.encode('ascii')
383 bcontent = base64.decodestring(b64_bytes)
383 bcontent = base64.decodestring(b64_bytes)
384 except Exception as e:
384 except Exception as e:
385 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
385 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
386 with self.atomic_writing(os_path, text=False) as f:
386 with self.atomic_writing(os_path, text=False) as f:
387 f.write(bcontent)
387 f.write(bcontent)
388
388
389 def _save_directory(self, os_path, model, path=''):
389 def _save_directory(self, os_path, model, path=''):
390 """create a directory"""
390 """create a directory"""
391 if is_hidden(os_path, self.root_dir):
391 if is_hidden(os_path, self.root_dir):
392 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
392 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
393 if not os.path.exists(os_path):
393 if not os.path.exists(os_path):
394 with self.perm_to_403():
394 with self.perm_to_403():
395 os.mkdir(os_path)
395 os.mkdir(os_path)
396 elif not os.path.isdir(os_path):
396 elif not os.path.isdir(os_path):
397 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
397 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
398 else:
398 else:
399 self.log.debug("Directory %r already exists", os_path)
399 self.log.debug("Directory %r already exists", os_path)
400
400
401 def save(self, model, path=''):
401 def save(self, model, path=''):
402 """Save the file model and return the model with no content."""
402 """Save the file model and return the model with no content."""
403 path = path.strip('/')
403 path = path.strip('/')
404
404
405 if 'type' not in model:
405 if 'type' not in model:
406 raise web.HTTPError(400, u'No file type provided')
406 raise web.HTTPError(400, u'No file type provided')
407 if 'content' not in model and model['type'] != 'directory':
407 if 'content' not in model and model['type'] != 'directory':
408 raise web.HTTPError(400, u'No file content provided')
408 raise web.HTTPError(400, u'No file content provided')
409
409
410 # One checkpoint should always exist
410 # One checkpoint should always exist
411 if self.file_exists(path) and not self.list_checkpoints(path):
411 if self.file_exists(path) and not self.list_checkpoints(path):
412 self.create_checkpoint(path)
412 self.create_checkpoint(path)
413
413
414 os_path = self._get_os_path(path)
414 os_path = self._get_os_path(path)
415 self.log.debug("Saving %s", os_path)
415 self.log.debug("Saving %s", os_path)
416 try:
416 try:
417 if model['type'] == 'notebook':
417 if model['type'] == 'notebook':
418 self._save_notebook(os_path, model, path)
418 self._save_notebook(os_path, model, path)
419 elif model['type'] == 'file':
419 elif model['type'] == 'file':
420 self._save_file(os_path, model, path)
420 self._save_file(os_path, model, path)
421 elif model['type'] == 'directory':
421 elif model['type'] == 'directory':
422 self._save_directory(os_path, model, path)
422 self._save_directory(os_path, model, path)
423 else:
423 else:
424 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
424 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
425 except web.HTTPError:
425 except web.HTTPError:
426 raise
426 raise
427 except Exception as e:
427 except Exception as e:
428 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
428 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
429 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
429 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
430
430
431 validation_message = None
431 validation_message = None
432 if model['type'] == 'notebook':
432 if model['type'] == 'notebook':
433 self.validate_notebook_model(model)
433 self.validate_notebook_model(model)
434 validation_message = model.get('message', None)
434 validation_message = model.get('message', None)
435
435
436 model = self.get(path, content=False)
436 model = self.get(path, content=False)
437 if validation_message:
437 if validation_message:
438 model['message'] = validation_message
438 model['message'] = validation_message
439 return model
439 return model
440
440
441 def update(self, model, path):
441 def update(self, model, path):
442 """Update the file's path
442 """Update the file's path
443
443
444 For use in PATCH requests, to enable renaming a file without
444 For use in PATCH requests, to enable renaming a file without
445 re-uploading its contents. Only used for renaming at the moment.
445 re-uploading its contents. Only used for renaming at the moment.
446 """
446 """
447 path = path.strip('/')
447 path = path.strip('/')
448 new_path = model.get('path', path).strip('/')
448 new_path = model.get('path', path).strip('/')
449 if path != new_path:
449 if path != new_path:
450 self.rename(path, new_path)
450 self.rename(path, new_path)
451 model = self.get(new_path, content=False)
451 model = self.get(new_path, content=False)
452 return model
452 return model
453
453
454 def delete(self, path):
454 def delete(self, path):
455 """Delete file at path."""
455 """Delete file at path."""
456 path = path.strip('/')
456 path = path.strip('/')
457 os_path = self._get_os_path(path)
457 os_path = self._get_os_path(path)
458 rm = os.unlink
458 rm = os.unlink
459 if os.path.isdir(os_path):
459 if os.path.isdir(os_path):
460 listing = os.listdir(os_path)
460 listing = os.listdir(os_path)
461 # don't delete non-empty directories (checkpoints dir doesn't count)
461 # don't delete non-empty directories (checkpoints dir doesn't count)
462 if listing and listing != [self.checkpoint_dir]:
462 if listing and listing != [self.checkpoint_dir]:
463 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
463 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
464 elif not os.path.isfile(os_path):
464 elif not os.path.isfile(os_path):
465 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
465 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
466
466
467 # clear checkpoints
467 # clear checkpoints
468 for checkpoint in self.list_checkpoints(path):
468 for checkpoint in self.list_checkpoints(path):
469 checkpoint_id = checkpoint['id']
469 checkpoint_id = checkpoint['id']
470 cp_path = self.get_checkpoint_path(checkpoint_id, path)
470 cp_path = self.get_checkpoint_path(checkpoint_id, path)
471 if os.path.isfile(cp_path):
471 if os.path.isfile(cp_path):
472 self.log.debug("Unlinking checkpoint %s", cp_path)
472 self.log.debug("Unlinking checkpoint %s", cp_path)
473 with self.perm_to_403():
473 with self.perm_to_403():
474 rm(cp_path)
474 rm(cp_path)
475
475
476 if os.path.isdir(os_path):
476 if os.path.isdir(os_path):
477 self.log.debug("Removing directory %s", os_path)
477 self.log.debug("Removing directory %s", os_path)
478 with self.perm_to_403():
478 with self.perm_to_403():
479 shutil.rmtree(os_path)
479 shutil.rmtree(os_path)
480 else:
480 else:
481 self.log.debug("Unlinking file %s", os_path)
481 self.log.debug("Unlinking file %s", os_path)
482 with self.perm_to_403():
482 with self.perm_to_403():
483 rm(os_path)
483 rm(os_path)
484
484
485 def rename(self, old_path, new_path):
485 def rename(self, old_path, new_path):
486 """Rename a file."""
486 """Rename a file."""
487 old_path = old_path.strip('/')
487 old_path = old_path.strip('/')
488 new_path = new_path.strip('/')
488 new_path = new_path.strip('/')
489 if new_path == old_path:
489 if new_path == old_path:
490 return
490 return
491
491
492 new_os_path = self._get_os_path(new_path)
492 new_os_path = self._get_os_path(new_path)
493 old_os_path = self._get_os_path(old_path)
493 old_os_path = self._get_os_path(old_path)
494
494
495 # Should we proceed with the move?
495 # Should we proceed with the move?
496 if os.path.exists(new_os_path):
496 if os.path.exists(new_os_path):
497 raise web.HTTPError(409, u'File already exists: %s' % new_path)
497 raise web.HTTPError(409, u'File already exists: %s' % new_path)
498
498
499 # Move the file
499 # Move the file
500 try:
500 try:
501 with self.perm_to_403():
501 with self.perm_to_403():
502 shutil.move(old_os_path, new_os_path)
502 shutil.move(old_os_path, new_os_path)
503 except web.HTTPError:
503 except web.HTTPError:
504 raise
504 raise
505 except Exception as e:
505 except Exception as e:
506 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
506 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
507
507
508 # Move the checkpoints
508 # Move the checkpoints
509 old_checkpoints = self.list_checkpoints(old_path)
509 old_checkpoints = self.list_checkpoints(old_path)
510 for cp in old_checkpoints:
510 for cp in old_checkpoints:
511 checkpoint_id = cp['id']
511 checkpoint_id = cp['id']
512 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
512 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
513 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
513 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
514 if os.path.isfile(old_cp_path):
514 if os.path.isfile(old_cp_path):
515 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
515 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
516 with self.perm_to_403():
516 with self.perm_to_403():
517 shutil.move(old_cp_path, new_cp_path)
517 shutil.move(old_cp_path, new_cp_path)
518
518
519 # Checkpoint-related utilities
519 # Checkpoint-related utilities
520
520
521 def get_checkpoint_path(self, checkpoint_id, path):
521 def get_checkpoint_path(self, checkpoint_id, path):
522 """find the path to a checkpoint"""
522 """find the path to a checkpoint"""
523 path = path.strip('/')
523 path = path.strip('/')
524 parent, name = ('/' + path).rsplit('/', 1)
524 parent, name = ('/' + path).rsplit('/', 1)
525 parent = parent.strip('/')
525 parent = parent.strip('/')
526 basename, ext = os.path.splitext(name)
526 basename, ext = os.path.splitext(name)
527 filename = u"{name}-{checkpoint_id}{ext}".format(
527 filename = u"{name}-{checkpoint_id}{ext}".format(
528 name=basename,
528 name=basename,
529 checkpoint_id=checkpoint_id,
529 checkpoint_id=checkpoint_id,
530 ext=ext,
530 ext=ext,
531 )
531 )
532 os_path = self._get_os_path(path=parent)
532 os_path = self._get_os_path(path=parent)
533 cp_dir = os.path.join(os_path, self.checkpoint_dir)
533 cp_dir = os.path.join(os_path, self.checkpoint_dir)
534 with self.perm_to_403():
534 with self.perm_to_403():
535 ensure_dir_exists(cp_dir)
535 ensure_dir_exists(cp_dir)
536 cp_path = os.path.join(cp_dir, filename)
536 cp_path = os.path.join(cp_dir, filename)
537 return cp_path
537 return cp_path
538
538
539 def get_checkpoint_model(self, checkpoint_id, path):
539 def get_checkpoint_model(self, checkpoint_id, path):
540 """construct the info dict for a given checkpoint"""
540 """construct the info dict for a given checkpoint"""
541 path = path.strip('/')
541 path = path.strip('/')
542 cp_path = self.get_checkpoint_path(checkpoint_id, path)
542 cp_path = self.get_checkpoint_path(checkpoint_id, path)
543 stats = os.stat(cp_path)
543 stats = os.stat(cp_path)
544 last_modified = tz.utcfromtimestamp(stats.st_mtime)
544 last_modified = tz.utcfromtimestamp(stats.st_mtime)
545 info = dict(
545 info = dict(
546 id = checkpoint_id,
546 id = checkpoint_id,
547 last_modified = last_modified,
547 last_modified = last_modified,
548 )
548 )
549 return info
549 return info
550
550
551 # public checkpoint API
551 # public checkpoint API
552
552
553 def create_checkpoint(self, path):
553 def create_checkpoint(self, path):
554 """Create a checkpoint from the current state of a file"""
554 """Create a checkpoint from the current state of a file"""
555 path = path.strip('/')
555 path = path.strip('/')
556 if not self.file_exists(path):
556 if not self.file_exists(path):
557 raise web.HTTPError(404)
557 raise web.HTTPError(404)
558 src_path = self._get_os_path(path)
558 src_path = self._get_os_path(path)
559 # only the one checkpoint ID:
559 # only the one checkpoint ID:
560 checkpoint_id = u"checkpoint"
560 checkpoint_id = u"checkpoint"
561 cp_path = self.get_checkpoint_path(checkpoint_id, path)
561 cp_path = self.get_checkpoint_path(checkpoint_id, path)
562 self.log.debug("creating checkpoint for %s", path)
562 self.log.debug("creating checkpoint for %s", path)
563 with self.perm_to_403():
563 with self.perm_to_403():
564 self._copy(src_path, cp_path)
564 self._copy(src_path, cp_path)
565
565
566 # return the checkpoint info
566 # return the checkpoint info
567 return self.get_checkpoint_model(checkpoint_id, path)
567 return self.get_checkpoint_model(checkpoint_id, path)
568
568
569 def list_checkpoints(self, path):
569 def list_checkpoints(self, path):
570 """list the checkpoints for a given file
570 """list the checkpoints for a given file
571
571
572 This contents manager currently only supports one checkpoint per file.
572 This contents manager currently only supports one checkpoint per file.
573 """
573 """
574 path = path.strip('/')
574 path = path.strip('/')
575 checkpoint_id = "checkpoint"
575 checkpoint_id = "checkpoint"
576 os_path = self.get_checkpoint_path(checkpoint_id, path)
576 os_path = self.get_checkpoint_path(checkpoint_id, path)
577 if not os.path.exists(os_path):
577 if not os.path.exists(os_path):
578 return []
578 return []
579 else:
579 else:
580 return [self.get_checkpoint_model(checkpoint_id, path)]
580 return [self.get_checkpoint_model(checkpoint_id, path)]
581
581
582
582
583 def restore_checkpoint(self, checkpoint_id, path):
583 def restore_checkpoint(self, checkpoint_id, path):
584 """restore a file to a checkpointed state"""
584 """restore a file to a checkpointed state"""
585 path = path.strip('/')
585 path = path.strip('/')
586 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
586 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
587 nb_path = self._get_os_path(path)
587 nb_path = self._get_os_path(path)
588 cp_path = self.get_checkpoint_path(checkpoint_id, path)
588 cp_path = self.get_checkpoint_path(checkpoint_id, path)
589 if not os.path.isfile(cp_path):
589 if not os.path.isfile(cp_path):
590 self.log.debug("checkpoint file does not exist: %s", cp_path)
590 self.log.debug("checkpoint file does not exist: %s", cp_path)
591 raise web.HTTPError(404,
591 raise web.HTTPError(404,
592 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
592 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
593 )
593 )
594 # ensure notebook is readable (never restore from an unreadable notebook)
594 # ensure notebook is readable (never restore from an unreadable notebook)
595 if cp_path.endswith('.ipynb'):
595 if cp_path.endswith('.ipynb'):
596 with self.open(cp_path, 'r', encoding='utf-8') as f:
596 with self.open(cp_path, 'r', encoding='utf-8') as f:
597 nbformat.read(f, as_version=4)
597 nbformat.read(f, as_version=4)
598 self.log.debug("copying %s -> %s", cp_path, nb_path)
598 self.log.debug("copying %s -> %s", cp_path, nb_path)
599 with self.perm_to_403():
599 with self.perm_to_403():
600 self._copy(cp_path, nb_path)
600 self._copy(cp_path, nb_path)
601
601
602 def delete_checkpoint(self, checkpoint_id, path):
602 def delete_checkpoint(self, checkpoint_id, path):
603 """delete a file's checkpoint"""
603 """delete a file's checkpoint"""
604 path = path.strip('/')
604 path = path.strip('/')
605 cp_path = self.get_checkpoint_path(checkpoint_id, path)
605 cp_path = self.get_checkpoint_path(checkpoint_id, path)
606 if not os.path.isfile(cp_path):
606 if not os.path.isfile(cp_path):
607 raise web.HTTPError(404,
607 raise web.HTTPError(404,
608 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
608 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
609 )
609 )
610 self.log.debug("unlinking %s", cp_path)
610 self.log.debug("unlinking %s", cp_path)
611 os.unlink(cp_path)
611 os.unlink(cp_path)
612
612
613 def info_string(self):
613 def info_string(self):
614 return "Serving notebooks from local directory: %s" % self.root_dir
614 return "Serving notebooks from local directory: %s" % self.root_dir
615
615
616 def get_kernel_path(self, path, model=None):
616 def get_kernel_path(self, path, model=None):
617 """Return the initial API path of a kernel associated with a given notebook"""
617 """Return the initial API path of a kernel associated with a given notebook"""
618 if '/' in path:
618 if '/' in path:
619 parent_dir = path.rsplit('/', 1)[0]
619 parent_dir = path.rsplit('/', 1)[0]
620 else:
620 else:
621 parent_dir = ''
621 parent_dir = ''
622 return parent_dir
622 return parent_dir
@@ -1,266 +1,266 b''
1 """Tornado handlers for the contents web service."""
1 """Tornado handlers for the contents web service."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7
7
8 from tornado import web
8 from tornado import web
9
9
10 from IPython.html.utils import url_path_join, url_escape
10 from IPython.html.utils import url_path_join, url_escape
11 from IPython.utils.jsonutil import date_default
11 from IPython.utils.jsonutil import date_default
12
12
13 from IPython.html.base.handlers import (
13 from IPython.html.base.handlers import (
14 IPythonHandler, json_errors, path_regex,
14 IPythonHandler, json_errors, path_regex,
15 )
15 )
16
16
17
17
18 def sort_key(model):
18 def sort_key(model):
19 """key function for case-insensitive sort by name and type"""
19 """key function for case-insensitive sort by name and type"""
20 iname = model['name'].lower()
20 iname = model['name'].lower()
21 type_key = {
21 type_key = {
22 'directory' : '0',
22 'directory' : '0',
23 'notebook' : '1',
23 'notebook' : '1',
24 'file' : '2',
24 'file' : '2',
25 }.get(model['type'], '9')
25 }.get(model['type'], '9')
26 return u'%s%s' % (type_key, iname)
26 return u'%s%s' % (type_key, iname)
27
27
28 class ContentsHandler(IPythonHandler):
28 class ContentsHandler(IPythonHandler):
29
29
30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
31
31
32 def location_url(self, path):
32 def location_url(self, path):
33 """Return the full URL location of a file.
33 """Return the full URL location of a file.
34
34
35 Parameters
35 Parameters
36 ----------
36 ----------
37 path : unicode
37 path : unicode
38 The API path of the file, such as "foo/bar.txt".
38 The API path of the file, such as "foo/bar.txt".
39 """
39 """
40 return url_escape(url_path_join(
40 return url_escape(url_path_join(
41 self.base_url, 'api', 'contents', path
41 self.base_url, 'api', 'contents', path
42 ))
42 ))
43
43
44 def _finish_model(self, model, location=True):
44 def _finish_model(self, model, location=True):
45 """Finish a JSON request with a model, setting relevant headers, etc."""
45 """Finish a JSON request with a model, setting relevant headers, etc."""
46 if location:
46 if location:
47 location = self.location_url(model['path'])
47 location = self.location_url(model['path'])
48 self.set_header('Location', location)
48 self.set_header('Location', location)
49 self.set_header('Last-Modified', model['last_modified'])
49 self.set_header('Last-Modified', model['last_modified'])
50 self.set_header('Content-Type', 'application/json')
50 self.set_header('Content-Type', 'application/json')
51 self.finish(json.dumps(model, default=date_default))
51 self.finish(json.dumps(model, default=date_default))
52
52
53 @web.authenticated
53 @web.authenticated
54 @json_errors
54 @json_errors
55 def get(self, path=''):
55 def get(self, path=''):
56 """Return a model for a file or directory.
56 """Return a model for a file or directory.
57
57
58 A directory model contains a list of models (without content)
58 A directory model contains a list of models (without content)
59 of the files and directories it contains.
59 of the files and directories it contains.
60 """
60 """
61 path = path or ''
61 path = path or ''
62 type_ = self.get_query_argument('type', default=None)
62 type = self.get_query_argument('type', default=None)
63 if type_ not in {None, 'directory', 'file', 'notebook'}:
63 if type not in {None, 'directory', 'file', 'notebook'}:
64 raise web.HTTPError(400, u'Type %r is invalid' % type_)
64 raise web.HTTPError(400, u'Type %r is invalid' % type)
65
65
66 format = self.get_query_argument('format', default=None)
66 format = self.get_query_argument('format', default=None)
67 if format not in {None, 'text', 'base64'}:
67 if format not in {None, 'text', 'base64'}:
68 raise web.HTTPError(400, u'Format %r is invalid' % format)
68 raise web.HTTPError(400, u'Format %r is invalid' % format)
69
69
70 model = self.contents_manager.get(path=path, type_=type_, format=format)
70 model = self.contents_manager.get(path=path, type=type, format=format)
71 if model['type'] == 'directory':
71 if model['type'] == 'directory':
72 # group listing by type, then by name (case-insensitive)
72 # group listing by type, then by name (case-insensitive)
73 # FIXME: sorting should be done in the frontends
73 # FIXME: sorting should be done in the frontends
74 model['content'].sort(key=sort_key)
74 model['content'].sort(key=sort_key)
75 self._finish_model(model, location=False)
75 self._finish_model(model, location=False)
76
76
77 @web.authenticated
77 @web.authenticated
78 @json_errors
78 @json_errors
79 def patch(self, path=''):
79 def patch(self, path=''):
80 """PATCH renames a file or directory without re-uploading content."""
80 """PATCH renames a file or directory without re-uploading content."""
81 cm = self.contents_manager
81 cm = self.contents_manager
82 model = self.get_json_body()
82 model = self.get_json_body()
83 if model is None:
83 if model is None:
84 raise web.HTTPError(400, u'JSON body missing')
84 raise web.HTTPError(400, u'JSON body missing')
85 model = cm.update(model, path)
85 model = cm.update(model, path)
86 self._finish_model(model)
86 self._finish_model(model)
87
87
88 def _copy(self, copy_from, copy_to=None):
88 def _copy(self, copy_from, copy_to=None):
89 """Copy a file, optionally specifying a target directory."""
89 """Copy a file, optionally specifying a target directory."""
90 self.log.info(u"Copying {copy_from} to {copy_to}".format(
90 self.log.info(u"Copying {copy_from} to {copy_to}".format(
91 copy_from=copy_from,
91 copy_from=copy_from,
92 copy_to=copy_to or '',
92 copy_to=copy_to or '',
93 ))
93 ))
94 model = self.contents_manager.copy(copy_from, copy_to)
94 model = self.contents_manager.copy(copy_from, copy_to)
95 self.set_status(201)
95 self.set_status(201)
96 self._finish_model(model)
96 self._finish_model(model)
97
97
98 def _upload(self, model, path):
98 def _upload(self, model, path):
99 """Handle upload of a new file to path"""
99 """Handle upload of a new file to path"""
100 self.log.info(u"Uploading file to %s", path)
100 self.log.info(u"Uploading file to %s", path)
101 model = self.contents_manager.new(model, path)
101 model = self.contents_manager.new(model, path)
102 self.set_status(201)
102 self.set_status(201)
103 self._finish_model(model)
103 self._finish_model(model)
104
104
105 def _new_untitled(self, path, type='', ext=''):
105 def _new_untitled(self, path, type='', ext=''):
106 """Create a new, empty untitled entity"""
106 """Create a new, empty untitled entity"""
107 self.log.info(u"Creating new %s in %s", type or 'file', path)
107 self.log.info(u"Creating new %s in %s", type or 'file', path)
108 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
108 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
109 self.set_status(201)
109 self.set_status(201)
110 self._finish_model(model)
110 self._finish_model(model)
111
111
112 def _save(self, model, path):
112 def _save(self, model, path):
113 """Save an existing file."""
113 """Save an existing file."""
114 self.log.info(u"Saving file at %s", path)
114 self.log.info(u"Saving file at %s", path)
115 model = self.contents_manager.save(model, path)
115 model = self.contents_manager.save(model, path)
116 self._finish_model(model)
116 self._finish_model(model)
117
117
118 @web.authenticated
118 @web.authenticated
119 @json_errors
119 @json_errors
120 def post(self, path=''):
120 def post(self, path=''):
121 """Create a new file in the specified path.
121 """Create a new file in the specified path.
122
122
123 POST creates new files. The server always decides on the name.
123 POST creates new files. The server always decides on the name.
124
124
125 POST /api/contents/path
125 POST /api/contents/path
126 New untitled, empty file or directory.
126 New untitled, empty file or directory.
127 POST /api/contents/path
127 POST /api/contents/path
128 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
128 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
129 New copy of OtherNotebook in path
129 New copy of OtherNotebook in path
130 """
130 """
131
131
132 cm = self.contents_manager
132 cm = self.contents_manager
133
133
134 if cm.file_exists(path):
134 if cm.file_exists(path):
135 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
135 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
136
136
137 if not cm.dir_exists(path):
137 if not cm.dir_exists(path):
138 raise web.HTTPError(404, "No such directory: %s" % path)
138 raise web.HTTPError(404, "No such directory: %s" % path)
139
139
140 model = self.get_json_body()
140 model = self.get_json_body()
141
141
142 if model is not None:
142 if model is not None:
143 copy_from = model.get('copy_from')
143 copy_from = model.get('copy_from')
144 ext = model.get('ext', '')
144 ext = model.get('ext', '')
145 type = model.get('type', '')
145 type = model.get('type', '')
146 if copy_from:
146 if copy_from:
147 self._copy(copy_from, path)
147 self._copy(copy_from, path)
148 else:
148 else:
149 self._new_untitled(path, type=type, ext=ext)
149 self._new_untitled(path, type=type, ext=ext)
150 else:
150 else:
151 self._new_untitled(path)
151 self._new_untitled(path)
152
152
153 @web.authenticated
153 @web.authenticated
154 @json_errors
154 @json_errors
155 def put(self, path=''):
155 def put(self, path=''):
156 """Saves the file in the location specified by name and path.
156 """Saves the file in the location specified by name and path.
157
157
158 PUT is very similar to POST, but the requester specifies the name,
158 PUT is very similar to POST, but the requester specifies the name,
159 whereas with POST, the server picks the name.
159 whereas with POST, the server picks the name.
160
160
161 PUT /api/contents/path/Name.ipynb
161 PUT /api/contents/path/Name.ipynb
162 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
162 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
163 in `content` key of JSON request body. If content is not specified,
163 in `content` key of JSON request body. If content is not specified,
164 create a new empty notebook.
164 create a new empty notebook.
165 """
165 """
166 model = self.get_json_body()
166 model = self.get_json_body()
167 if model:
167 if model:
168 if model.get('copy_from'):
168 if model.get('copy_from'):
169 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
169 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
170 if self.contents_manager.file_exists(path):
170 if self.contents_manager.file_exists(path):
171 self._save(model, path)
171 self._save(model, path)
172 else:
172 else:
173 self._upload(model, path)
173 self._upload(model, path)
174 else:
174 else:
175 self._new_untitled(path)
175 self._new_untitled(path)
176
176
177 @web.authenticated
177 @web.authenticated
178 @json_errors
178 @json_errors
179 def delete(self, path=''):
179 def delete(self, path=''):
180 """delete a file in the given path"""
180 """delete a file in the given path"""
181 cm = self.contents_manager
181 cm = self.contents_manager
182 self.log.warn('delete %s', path)
182 self.log.warn('delete %s', path)
183 cm.delete(path)
183 cm.delete(path)
184 self.set_status(204)
184 self.set_status(204)
185 self.finish()
185 self.finish()
186
186
187
187
188 class CheckpointsHandler(IPythonHandler):
188 class CheckpointsHandler(IPythonHandler):
189
189
190 SUPPORTED_METHODS = ('GET', 'POST')
190 SUPPORTED_METHODS = ('GET', 'POST')
191
191
192 @web.authenticated
192 @web.authenticated
193 @json_errors
193 @json_errors
194 def get(self, path=''):
194 def get(self, path=''):
195 """get lists checkpoints for a file"""
195 """get lists checkpoints for a file"""
196 cm = self.contents_manager
196 cm = self.contents_manager
197 checkpoints = cm.list_checkpoints(path)
197 checkpoints = cm.list_checkpoints(path)
198 data = json.dumps(checkpoints, default=date_default)
198 data = json.dumps(checkpoints, default=date_default)
199 self.finish(data)
199 self.finish(data)
200
200
201 @web.authenticated
201 @web.authenticated
202 @json_errors
202 @json_errors
203 def post(self, path=''):
203 def post(self, path=''):
204 """post creates a new checkpoint"""
204 """post creates a new checkpoint"""
205 cm = self.contents_manager
205 cm = self.contents_manager
206 checkpoint = cm.create_checkpoint(path)
206 checkpoint = cm.create_checkpoint(path)
207 data = json.dumps(checkpoint, default=date_default)
207 data = json.dumps(checkpoint, default=date_default)
208 location = url_path_join(self.base_url, 'api/contents',
208 location = url_path_join(self.base_url, 'api/contents',
209 path, 'checkpoints', checkpoint['id'])
209 path, 'checkpoints', checkpoint['id'])
210 self.set_header('Location', url_escape(location))
210 self.set_header('Location', url_escape(location))
211 self.set_status(201)
211 self.set_status(201)
212 self.finish(data)
212 self.finish(data)
213
213
214
214
215 class ModifyCheckpointsHandler(IPythonHandler):
215 class ModifyCheckpointsHandler(IPythonHandler):
216
216
217 SUPPORTED_METHODS = ('POST', 'DELETE')
217 SUPPORTED_METHODS = ('POST', 'DELETE')
218
218
219 @web.authenticated
219 @web.authenticated
220 @json_errors
220 @json_errors
221 def post(self, path, checkpoint_id):
221 def post(self, path, checkpoint_id):
222 """post restores a file from a checkpoint"""
222 """post restores a file from a checkpoint"""
223 cm = self.contents_manager
223 cm = self.contents_manager
224 cm.restore_checkpoint(checkpoint_id, path)
224 cm.restore_checkpoint(checkpoint_id, path)
225 self.set_status(204)
225 self.set_status(204)
226 self.finish()
226 self.finish()
227
227
228 @web.authenticated
228 @web.authenticated
229 @json_errors
229 @json_errors
230 def delete(self, path, checkpoint_id):
230 def delete(self, path, checkpoint_id):
231 """delete clears a checkpoint for a given file"""
231 """delete clears a checkpoint for a given file"""
232 cm = self.contents_manager
232 cm = self.contents_manager
233 cm.delete_checkpoint(checkpoint_id, path)
233 cm.delete_checkpoint(checkpoint_id, path)
234 self.set_status(204)
234 self.set_status(204)
235 self.finish()
235 self.finish()
236
236
237
237
238 class NotebooksRedirectHandler(IPythonHandler):
238 class NotebooksRedirectHandler(IPythonHandler):
239 """Redirect /api/notebooks to /api/contents"""
239 """Redirect /api/notebooks to /api/contents"""
240 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
240 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
241
241
242 def get(self, path):
242 def get(self, path):
243 self.log.warn("/api/notebooks is deprecated, use /api/contents")
243 self.log.warn("/api/notebooks is deprecated, use /api/contents")
244 self.redirect(url_path_join(
244 self.redirect(url_path_join(
245 self.base_url,
245 self.base_url,
246 'api/contents',
246 'api/contents',
247 path
247 path
248 ))
248 ))
249
249
250 put = patch = post = delete = get
250 put = patch = post = delete = get
251
251
252
252
253 #-----------------------------------------------------------------------------
253 #-----------------------------------------------------------------------------
254 # URL to handler mappings
254 # URL to handler mappings
255 #-----------------------------------------------------------------------------
255 #-----------------------------------------------------------------------------
256
256
257
257
258 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
258 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
259
259
260 default_handlers = [
260 default_handlers = [
261 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
261 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
262 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
262 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
263 ModifyCheckpointsHandler),
263 ModifyCheckpointsHandler),
264 (r"/api/contents%s" % path_regex, ContentsHandler),
264 (r"/api/contents%s" % path_regex, ContentsHandler),
265 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
265 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
266 ]
266 ]
@@ -1,384 +1,384 b''
1 """A base class for contents managers."""
1 """A base class for contents managers."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from fnmatch import fnmatch
6 from fnmatch import fnmatch
7 import itertools
7 import itertools
8 import json
8 import json
9 import os
9 import os
10 import re
10 import re
11
11
12 from tornado.web import HTTPError
12 from tornado.web import HTTPError
13
13
14 from IPython.config.configurable import LoggingConfigurable
14 from IPython.config.configurable import LoggingConfigurable
15 from IPython.nbformat import sign, validate, ValidationError
15 from IPython.nbformat import sign, validate, ValidationError
16 from IPython.nbformat.v4 import new_notebook
16 from IPython.nbformat.v4 import new_notebook
17 from IPython.utils.traitlets import Instance, Unicode, List
17 from IPython.utils.traitlets import Instance, Unicode, List
18
18
19 copy_pat = re.compile(r'\-Copy\d*\.')
19 copy_pat = re.compile(r'\-Copy\d*\.')
20
20
21 class ContentsManager(LoggingConfigurable):
21 class ContentsManager(LoggingConfigurable):
22 """Base class for serving files and directories.
22 """Base class for serving files and directories.
23
23
24 This serves any text or binary file,
24 This serves any text or binary file,
25 as well as directories,
25 as well as directories,
26 with special handling for JSON notebook documents.
26 with special handling for JSON notebook documents.
27
27
28 Most APIs take a path argument,
28 Most APIs take a path argument,
29 which is always an API-style unicode path,
29 which is always an API-style unicode path,
30 and always refers to a directory.
30 and always refers to a directory.
31
31
32 - unicode, not url-escaped
32 - unicode, not url-escaped
33 - '/'-separated
33 - '/'-separated
34 - leading and trailing '/' will be stripped
34 - leading and trailing '/' will be stripped
35 - if unspecified, path defaults to '',
35 - if unspecified, path defaults to '',
36 indicating the root path.
36 indicating the root path.
37
37
38 """
38 """
39
39
40 notary = Instance(sign.NotebookNotary)
40 notary = Instance(sign.NotebookNotary)
41 def _notary_default(self):
41 def _notary_default(self):
42 return sign.NotebookNotary(parent=self)
42 return sign.NotebookNotary(parent=self)
43
43
44 hide_globs = List(Unicode, [
44 hide_globs = List(Unicode, [
45 u'__pycache__', '*.pyc', '*.pyo',
45 u'__pycache__', '*.pyc', '*.pyo',
46 '.DS_Store', '*.so', '*.dylib', '*~',
46 '.DS_Store', '*.so', '*.dylib', '*~',
47 ], config=True, help="""
47 ], config=True, help="""
48 Glob patterns to hide in file and directory listings.
48 Glob patterns to hide in file and directory listings.
49 """)
49 """)
50
50
51 untitled_notebook = Unicode("Untitled", config=True,
51 untitled_notebook = Unicode("Untitled", config=True,
52 help="The base name used when creating untitled notebooks."
52 help="The base name used when creating untitled notebooks."
53 )
53 )
54
54
55 untitled_file = Unicode("untitled", config=True,
55 untitled_file = Unicode("untitled", config=True,
56 help="The base name used when creating untitled files."
56 help="The base name used when creating untitled files."
57 )
57 )
58
58
59 untitled_directory = Unicode("Untitled Folder", config=True,
59 untitled_directory = Unicode("Untitled Folder", config=True,
60 help="The base name used when creating untitled directories."
60 help="The base name used when creating untitled directories."
61 )
61 )
62
62
63 # ContentsManager API part 1: methods that must be
63 # ContentsManager API part 1: methods that must be
64 # implemented in subclasses.
64 # implemented in subclasses.
65
65
66 def dir_exists(self, path):
66 def dir_exists(self, path):
67 """Does the API-style path (directory) actually exist?
67 """Does the API-style path (directory) actually exist?
68
68
69 Like os.path.isdir
69 Like os.path.isdir
70
70
71 Override this method in subclasses.
71 Override this method in subclasses.
72
72
73 Parameters
73 Parameters
74 ----------
74 ----------
75 path : string
75 path : string
76 The path to check
76 The path to check
77
77
78 Returns
78 Returns
79 -------
79 -------
80 exists : bool
80 exists : bool
81 Whether the path does indeed exist.
81 Whether the path does indeed exist.
82 """
82 """
83 raise NotImplementedError
83 raise NotImplementedError
84
84
85 def is_hidden(self, path):
85 def is_hidden(self, path):
86 """Does the API style path correspond to a hidden directory or file?
86 """Does the API style path correspond to a hidden directory or file?
87
87
88 Parameters
88 Parameters
89 ----------
89 ----------
90 path : string
90 path : string
91 The path to check. This is an API path (`/` separated,
91 The path to check. This is an API path (`/` separated,
92 relative to root dir).
92 relative to root dir).
93
93
94 Returns
94 Returns
95 -------
95 -------
96 hidden : bool
96 hidden : bool
97 Whether the path is hidden.
97 Whether the path is hidden.
98
98
99 """
99 """
100 raise NotImplementedError
100 raise NotImplementedError
101
101
102 def file_exists(self, path=''):
102 def file_exists(self, path=''):
103 """Does a file exist at the given path?
103 """Does a file exist at the given path?
104
104
105 Like os.path.isfile
105 Like os.path.isfile
106
106
107 Override this method in subclasses.
107 Override this method in subclasses.
108
108
109 Parameters
109 Parameters
110 ----------
110 ----------
111 name : string
111 name : string
112 The name of the file you are checking.
112 The name of the file you are checking.
113 path : string
113 path : string
114 The relative path to the file's directory (with '/' as separator)
114 The relative path to the file's directory (with '/' as separator)
115
115
116 Returns
116 Returns
117 -------
117 -------
118 exists : bool
118 exists : bool
119 Whether the file exists.
119 Whether the file exists.
120 """
120 """
121 raise NotImplementedError('must be implemented in a subclass')
121 raise NotImplementedError('must be implemented in a subclass')
122
122
123 def exists(self, path):
123 def exists(self, path):
124 """Does a file or directory exist at the given path?
124 """Does a file or directory exist at the given path?
125
125
126 Like os.path.exists
126 Like os.path.exists
127
127
128 Parameters
128 Parameters
129 ----------
129 ----------
130 path : string
130 path : string
131 The relative path to the file's directory (with '/' as separator)
131 The relative path to the file's directory (with '/' as separator)
132
132
133 Returns
133 Returns
134 -------
134 -------
135 exists : bool
135 exists : bool
136 Whether the target exists.
136 Whether the target exists.
137 """
137 """
138 return self.file_exists(path) or self.dir_exists(path)
138 return self.file_exists(path) or self.dir_exists(path)
139
139
140 def get(self, path, content=True, type_=None, format=None):
140 def get(self, path, content=True, type=None, format=None):
141 """Get the model of a file or directory with or without content."""
141 """Get the model of a file or directory with or without content."""
142 raise NotImplementedError('must be implemented in a subclass')
142 raise NotImplementedError('must be implemented in a subclass')
143
143
144 def save(self, model, path):
144 def save(self, model, path):
145 """Save the file or directory and return the model with no content."""
145 """Save the file or directory and return the model with no content."""
146 raise NotImplementedError('must be implemented in a subclass')
146 raise NotImplementedError('must be implemented in a subclass')
147
147
148 def update(self, model, path):
148 def update(self, model, path):
149 """Update the file or directory and return the model with no content.
149 """Update the file or directory and return the model with no content.
150
150
151 For use in PATCH requests, to enable renaming a file without
151 For use in PATCH requests, to enable renaming a file without
152 re-uploading its contents. Only used for renaming at the moment.
152 re-uploading its contents. Only used for renaming at the moment.
153 """
153 """
154 raise NotImplementedError('must be implemented in a subclass')
154 raise NotImplementedError('must be implemented in a subclass')
155
155
156 def delete(self, path):
156 def delete(self, path):
157 """Delete file or directory by path."""
157 """Delete file or directory by path."""
158 raise NotImplementedError('must be implemented in a subclass')
158 raise NotImplementedError('must be implemented in a subclass')
159
159
160 def create_checkpoint(self, path):
160 def create_checkpoint(self, path):
161 """Create a checkpoint of the current state of a file
161 """Create a checkpoint of the current state of a file
162
162
163 Returns a checkpoint_id for the new checkpoint.
163 Returns a checkpoint_id for the new checkpoint.
164 """
164 """
165 raise NotImplementedError("must be implemented in a subclass")
165 raise NotImplementedError("must be implemented in a subclass")
166
166
167 def list_checkpoints(self, path):
167 def list_checkpoints(self, path):
168 """Return a list of checkpoints for a given file"""
168 """Return a list of checkpoints for a given file"""
169 return []
169 return []
170
170
171 def restore_checkpoint(self, checkpoint_id, path):
171 def restore_checkpoint(self, checkpoint_id, path):
172 """Restore a file from one of its checkpoints"""
172 """Restore a file from one of its checkpoints"""
173 raise NotImplementedError("must be implemented in a subclass")
173 raise NotImplementedError("must be implemented in a subclass")
174
174
175 def delete_checkpoint(self, checkpoint_id, path):
175 def delete_checkpoint(self, checkpoint_id, path):
176 """delete a checkpoint for a file"""
176 """delete a checkpoint for a file"""
177 raise NotImplementedError("must be implemented in a subclass")
177 raise NotImplementedError("must be implemented in a subclass")
178
178
179 # ContentsManager API part 2: methods that have useable default
179 # ContentsManager API part 2: methods that have useable default
180 # implementations, but can be overridden in subclasses.
180 # implementations, but can be overridden in subclasses.
181
181
182 def info_string(self):
182 def info_string(self):
183 return "Serving contents"
183 return "Serving contents"
184
184
185 def get_kernel_path(self, path, model=None):
185 def get_kernel_path(self, path, model=None):
186 """Return the API path for the kernel
186 """Return the API path for the kernel
187
187
188 KernelManagers can turn this value into a filesystem path,
188 KernelManagers can turn this value into a filesystem path,
189 or ignore it altogether.
189 or ignore it altogether.
190
190
191 The default value here will start kernels in the directory of the
191 The default value here will start kernels in the directory of the
192 notebook server. FileContentsManager overrides this to use the
192 notebook server. FileContentsManager overrides this to use the
193 directory containing the notebook.
193 directory containing the notebook.
194 """
194 """
195 return ''
195 return ''
196
196
197 def increment_filename(self, filename, path='', insert=''):
197 def increment_filename(self, filename, path='', insert=''):
198 """Increment a filename until it is unique.
198 """Increment a filename until it is unique.
199
199
200 Parameters
200 Parameters
201 ----------
201 ----------
202 filename : unicode
202 filename : unicode
203 The name of a file, including extension
203 The name of a file, including extension
204 path : unicode
204 path : unicode
205 The API path of the target's directory
205 The API path of the target's directory
206
206
207 Returns
207 Returns
208 -------
208 -------
209 name : unicode
209 name : unicode
210 A filename that is unique, based on the input filename.
210 A filename that is unique, based on the input filename.
211 """
211 """
212 path = path.strip('/')
212 path = path.strip('/')
213 basename, ext = os.path.splitext(filename)
213 basename, ext = os.path.splitext(filename)
214 for i in itertools.count():
214 for i in itertools.count():
215 if i:
215 if i:
216 insert_i = '{}{}'.format(insert, i)
216 insert_i = '{}{}'.format(insert, i)
217 else:
217 else:
218 insert_i = ''
218 insert_i = ''
219 name = u'{basename}{insert}{ext}'.format(basename=basename,
219 name = u'{basename}{insert}{ext}'.format(basename=basename,
220 insert=insert_i, ext=ext)
220 insert=insert_i, ext=ext)
221 if not self.exists(u'{}/{}'.format(path, name)):
221 if not self.exists(u'{}/{}'.format(path, name)):
222 break
222 break
223 return name
223 return name
224
224
225 def validate_notebook_model(self, model):
225 def validate_notebook_model(self, model):
226 """Add failed-validation message to model"""
226 """Add failed-validation message to model"""
227 try:
227 try:
228 validate(model['content'])
228 validate(model['content'])
229 except ValidationError as e:
229 except ValidationError as e:
230 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
230 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
231 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
231 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
232 )
232 )
233 return model
233 return model
234
234
235 def new_untitled(self, path='', type='', ext=''):
235 def new_untitled(self, path='', type='', ext=''):
236 """Create a new untitled file or directory in path
236 """Create a new untitled file or directory in path
237
237
238 path must be a directory
238 path must be a directory
239
239
240 File extension can be specified.
240 File extension can be specified.
241
241
242 Use `new` to create files with a fully specified path (including filename).
242 Use `new` to create files with a fully specified path (including filename).
243 """
243 """
244 path = path.strip('/')
244 path = path.strip('/')
245 if not self.dir_exists(path):
245 if not self.dir_exists(path):
246 raise HTTPError(404, 'No such directory: %s' % path)
246 raise HTTPError(404, 'No such directory: %s' % path)
247
247
248 model = {}
248 model = {}
249 if type:
249 if type:
250 model['type'] = type
250 model['type'] = type
251
251
252 if ext == '.ipynb':
252 if ext == '.ipynb':
253 model.setdefault('type', 'notebook')
253 model.setdefault('type', 'notebook')
254 else:
254 else:
255 model.setdefault('type', 'file')
255 model.setdefault('type', 'file')
256
256
257 insert = ''
257 insert = ''
258 if model['type'] == 'directory':
258 if model['type'] == 'directory':
259 untitled = self.untitled_directory
259 untitled = self.untitled_directory
260 insert = ' '
260 insert = ' '
261 elif model['type'] == 'notebook':
261 elif model['type'] == 'notebook':
262 untitled = self.untitled_notebook
262 untitled = self.untitled_notebook
263 ext = '.ipynb'
263 ext = '.ipynb'
264 elif model['type'] == 'file':
264 elif model['type'] == 'file':
265 untitled = self.untitled_file
265 untitled = self.untitled_file
266 else:
266 else:
267 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
267 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
268
268
269 name = self.increment_filename(untitled + ext, path, insert=insert)
269 name = self.increment_filename(untitled + ext, path, insert=insert)
270 path = u'{0}/{1}'.format(path, name)
270 path = u'{0}/{1}'.format(path, name)
271 return self.new(model, path)
271 return self.new(model, path)
272
272
273 def new(self, model=None, path=''):
273 def new(self, model=None, path=''):
274 """Create a new file or directory and return its model with no content.
274 """Create a new file or directory and return its model with no content.
275
275
276 To create a new untitled entity in a directory, use `new_untitled`.
276 To create a new untitled entity in a directory, use `new_untitled`.
277 """
277 """
278 path = path.strip('/')
278 path = path.strip('/')
279 if model is None:
279 if model is None:
280 model = {}
280 model = {}
281
281
282 if path.endswith('.ipynb'):
282 if path.endswith('.ipynb'):
283 model.setdefault('type', 'notebook')
283 model.setdefault('type', 'notebook')
284 else:
284 else:
285 model.setdefault('type', 'file')
285 model.setdefault('type', 'file')
286
286
287 # no content, not a directory, so fill out new-file model
287 # no content, not a directory, so fill out new-file model
288 if 'content' not in model and model['type'] != 'directory':
288 if 'content' not in model and model['type'] != 'directory':
289 if model['type'] == 'notebook':
289 if model['type'] == 'notebook':
290 model['content'] = new_notebook()
290 model['content'] = new_notebook()
291 model['format'] = 'json'
291 model['format'] = 'json'
292 else:
292 else:
293 model['content'] = ''
293 model['content'] = ''
294 model['type'] = 'file'
294 model['type'] = 'file'
295 model['format'] = 'text'
295 model['format'] = 'text'
296
296
297 model = self.save(model, path)
297 model = self.save(model, path)
298 return model
298 return model
299
299
300 def copy(self, from_path, to_path=None):
300 def copy(self, from_path, to_path=None):
301 """Copy an existing file and return its new model.
301 """Copy an existing file and return its new model.
302
302
303 If to_path not specified, it will be the parent directory of from_path.
303 If to_path not specified, it will be the parent directory of from_path.
304 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
304 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
305
305
306 from_path must be a full path to a file.
306 from_path must be a full path to a file.
307 """
307 """
308 path = from_path.strip('/')
308 path = from_path.strip('/')
309 if '/' in path:
309 if '/' in path:
310 from_dir, from_name = path.rsplit('/', 1)
310 from_dir, from_name = path.rsplit('/', 1)
311 else:
311 else:
312 from_dir = ''
312 from_dir = ''
313 from_name = path
313 from_name = path
314
314
315 model = self.get(path)
315 model = self.get(path)
316 model.pop('path', None)
316 model.pop('path', None)
317 model.pop('name', None)
317 model.pop('name', None)
318 if model['type'] == 'directory':
318 if model['type'] == 'directory':
319 raise HTTPError(400, "Can't copy directories")
319 raise HTTPError(400, "Can't copy directories")
320
320
321 if not to_path:
321 if not to_path:
322 to_path = from_dir
322 to_path = from_dir
323 if self.dir_exists(to_path):
323 if self.dir_exists(to_path):
324 name = copy_pat.sub(u'.', from_name)
324 name = copy_pat.sub(u'.', from_name)
325 to_name = self.increment_filename(name, to_path, insert='-Copy')
325 to_name = self.increment_filename(name, to_path, insert='-Copy')
326 to_path = u'{0}/{1}'.format(to_path, to_name)
326 to_path = u'{0}/{1}'.format(to_path, to_name)
327
327
328 model = self.save(model, to_path)
328 model = self.save(model, to_path)
329 return model
329 return model
330
330
331 def log_info(self):
331 def log_info(self):
332 self.log.info(self.info_string())
332 self.log.info(self.info_string())
333
333
334 def trust_notebook(self, path):
334 def trust_notebook(self, path):
335 """Explicitly trust a notebook
335 """Explicitly trust a notebook
336
336
337 Parameters
337 Parameters
338 ----------
338 ----------
339 path : string
339 path : string
340 The path of a notebook
340 The path of a notebook
341 """
341 """
342 model = self.get(path)
342 model = self.get(path)
343 nb = model['content']
343 nb = model['content']
344 self.log.warn("Trusting notebook %s", path)
344 self.log.warn("Trusting notebook %s", path)
345 self.notary.mark_cells(nb, True)
345 self.notary.mark_cells(nb, True)
346 self.save(model, path)
346 self.save(model, path)
347
347
348 def check_and_sign(self, nb, path=''):
348 def check_and_sign(self, nb, path=''):
349 """Check for trusted cells, and sign the notebook.
349 """Check for trusted cells, and sign the notebook.
350
350
351 Called as a part of saving notebooks.
351 Called as a part of saving notebooks.
352
352
353 Parameters
353 Parameters
354 ----------
354 ----------
355 nb : dict
355 nb : dict
356 The notebook dict
356 The notebook dict
357 path : string
357 path : string
358 The notebook's path (for logging)
358 The notebook's path (for logging)
359 """
359 """
360 if self.notary.check_cells(nb):
360 if self.notary.check_cells(nb):
361 self.notary.sign(nb)
361 self.notary.sign(nb)
362 else:
362 else:
363 self.log.warn("Saving untrusted notebook %s", path)
363 self.log.warn("Saving untrusted notebook %s", path)
364
364
365 def mark_trusted_cells(self, nb, path=''):
365 def mark_trusted_cells(self, nb, path=''):
366 """Mark cells as trusted if the notebook signature matches.
366 """Mark cells as trusted if the notebook signature matches.
367
367
368 Called as a part of loading notebooks.
368 Called as a part of loading notebooks.
369
369
370 Parameters
370 Parameters
371 ----------
371 ----------
372 nb : dict
372 nb : dict
373 The notebook object (in current nbformat)
373 The notebook object (in current nbformat)
374 path : string
374 path : string
375 The notebook's path (for logging)
375 The notebook's path (for logging)
376 """
376 """
377 trusted = self.notary.check_signature(nb)
377 trusted = self.notary.check_signature(nb)
378 if not trusted:
378 if not trusted:
379 self.log.warn("Notebook %s is not trusted", path)
379 self.log.warn("Notebook %s is not trusted", path)
380 self.notary.mark_cells(nb, trusted)
380 self.notary.mark_cells(nb, trusted)
381
381
382 def should_list(self, name):
382 def should_list(self, name):
383 """Should this file/directory name be displayed in a listing?"""
383 """Should this file/directory name be displayed in a listing?"""
384 return not any(fnmatch(name, glob) for glob in self.hide_globs)
384 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,521 +1,521 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 import io
5 import io
6 import json
6 import json
7 import os
7 import os
8 import shutil
8 import shutil
9 from unicodedata import normalize
9 from unicodedata import normalize
10
10
11 pjoin = os.path.join
11 pjoin = os.path.join
12
12
13 import requests
13 import requests
14
14
15 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.utils import url_path_join, url_escape
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import read, write, from_dict
17 from IPython.nbformat import read, write, from_dict
18 from IPython.nbformat.v4 import (
18 from IPython.nbformat.v4 import (
19 new_notebook, new_markdown_cell,
19 new_notebook, new_markdown_cell,
20 )
20 )
21 from IPython.nbformat import v2
21 from IPython.nbformat import v2
22 from IPython.utils import py3compat
22 from IPython.utils import py3compat
23 from IPython.utils.data import uniq_stable
23 from IPython.utils.data import uniq_stable
24
24
25
25
26 def notebooks_only(dir_model):
26 def notebooks_only(dir_model):
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28
28
29 def dirs_only(dir_model):
29 def dirs_only(dir_model):
30 return [x for x in dir_model['content'] if x['type']=='directory']
30 return [x for x in dir_model['content'] if x['type']=='directory']
31
31
32
32
33 class API(object):
33 class API(object):
34 """Wrapper for contents API calls."""
34 """Wrapper for contents API calls."""
35 def __init__(self, base_url):
35 def __init__(self, base_url):
36 self.base_url = base_url
36 self.base_url = base_url
37
37
38 def _req(self, verb, path, body=None, params=None):
38 def _req(self, verb, path, body=None, params=None):
39 response = requests.request(verb,
39 response = requests.request(verb,
40 url_path_join(self.base_url, 'api/contents', path),
40 url_path_join(self.base_url, 'api/contents', path),
41 data=body, params=params,
41 data=body, params=params,
42 )
42 )
43 response.raise_for_status()
43 response.raise_for_status()
44 return response
44 return response
45
45
46 def list(self, path='/'):
46 def list(self, path='/'):
47 return self._req('GET', path)
47 return self._req('GET', path)
48
48
49 def read(self, path, type_=None, format=None):
49 def read(self, path, type=None, format=None):
50 params = {}
50 params = {}
51 if type_ is not None:
51 if type is not None:
52 params['type'] = type_
52 params['type'] = type
53 if format is not None:
53 if format is not None:
54 params['format'] = format
54 params['format'] = format
55 return self._req('GET', path, params=params)
55 return self._req('GET', path, params=params)
56
56
57 def create_untitled(self, path='/', ext='.ipynb'):
57 def create_untitled(self, path='/', ext='.ipynb'):
58 body = None
58 body = None
59 if ext:
59 if ext:
60 body = json.dumps({'ext': ext})
60 body = json.dumps({'ext': ext})
61 return self._req('POST', path, body)
61 return self._req('POST', path, body)
62
62
63 def mkdir_untitled(self, path='/'):
63 def mkdir_untitled(self, path='/'):
64 return self._req('POST', path, json.dumps({'type': 'directory'}))
64 return self._req('POST', path, json.dumps({'type': 'directory'}))
65
65
66 def copy(self, copy_from, path='/'):
66 def copy(self, copy_from, path='/'):
67 body = json.dumps({'copy_from':copy_from})
67 body = json.dumps({'copy_from':copy_from})
68 return self._req('POST', path, body)
68 return self._req('POST', path, body)
69
69
70 def create(self, path='/'):
70 def create(self, path='/'):
71 return self._req('PUT', path)
71 return self._req('PUT', path)
72
72
73 def upload(self, path, body):
73 def upload(self, path, body):
74 return self._req('PUT', path, body)
74 return self._req('PUT', path, body)
75
75
76 def mkdir_untitled(self, path='/'):
76 def mkdir_untitled(self, path='/'):
77 return self._req('POST', path, json.dumps({'type': 'directory'}))
77 return self._req('POST', path, json.dumps({'type': 'directory'}))
78
78
79 def mkdir(self, path='/'):
79 def mkdir(self, path='/'):
80 return self._req('PUT', path, json.dumps({'type': 'directory'}))
80 return self._req('PUT', path, json.dumps({'type': 'directory'}))
81
81
82 def copy_put(self, copy_from, path='/'):
82 def copy_put(self, copy_from, path='/'):
83 body = json.dumps({'copy_from':copy_from})
83 body = json.dumps({'copy_from':copy_from})
84 return self._req('PUT', path, body)
84 return self._req('PUT', path, body)
85
85
86 def save(self, path, body):
86 def save(self, path, body):
87 return self._req('PUT', path, body)
87 return self._req('PUT', path, body)
88
88
89 def delete(self, path='/'):
89 def delete(self, path='/'):
90 return self._req('DELETE', path)
90 return self._req('DELETE', path)
91
91
92 def rename(self, path, new_path):
92 def rename(self, path, new_path):
93 body = json.dumps({'path': new_path})
93 body = json.dumps({'path': new_path})
94 return self._req('PATCH', path, body)
94 return self._req('PATCH', path, body)
95
95
96 def get_checkpoints(self, path):
96 def get_checkpoints(self, path):
97 return self._req('GET', url_path_join(path, 'checkpoints'))
97 return self._req('GET', url_path_join(path, 'checkpoints'))
98
98
99 def new_checkpoint(self, path):
99 def new_checkpoint(self, path):
100 return self._req('POST', url_path_join(path, 'checkpoints'))
100 return self._req('POST', url_path_join(path, 'checkpoints'))
101
101
102 def restore_checkpoint(self, path, checkpoint_id):
102 def restore_checkpoint(self, path, checkpoint_id):
103 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
103 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
104
104
105 def delete_checkpoint(self, path, checkpoint_id):
105 def delete_checkpoint(self, path, checkpoint_id):
106 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
106 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
107
107
108 class APITest(NotebookTestBase):
108 class APITest(NotebookTestBase):
109 """Test the kernels web service API"""
109 """Test the kernels web service API"""
110 dirs_nbs = [('', 'inroot'),
110 dirs_nbs = [('', 'inroot'),
111 ('Directory with spaces in', 'inspace'),
111 ('Directory with spaces in', 'inspace'),
112 (u'unicodΓ©', 'innonascii'),
112 (u'unicodΓ©', 'innonascii'),
113 ('foo', 'a'),
113 ('foo', 'a'),
114 ('foo', 'b'),
114 ('foo', 'b'),
115 ('foo', 'name with spaces'),
115 ('foo', 'name with spaces'),
116 ('foo', u'unicodΓ©'),
116 ('foo', u'unicodΓ©'),
117 ('foo/bar', 'baz'),
117 ('foo/bar', 'baz'),
118 ('ordering', 'A'),
118 ('ordering', 'A'),
119 ('ordering', 'b'),
119 ('ordering', 'b'),
120 ('ordering', 'C'),
120 ('ordering', 'C'),
121 (u'Γ₯ b', u'Γ§ d'),
121 (u'Γ₯ b', u'Γ§ d'),
122 ]
122 ]
123 hidden_dirs = ['.hidden', '__pycache__']
123 hidden_dirs = ['.hidden', '__pycache__']
124
124
125 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
125 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
126 del dirs[0] # remove ''
126 del dirs[0] # remove ''
127 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
127 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
128
128
129 @staticmethod
129 @staticmethod
130 def _blob_for_name(name):
130 def _blob_for_name(name):
131 return name.encode('utf-8') + b'\xFF'
131 return name.encode('utf-8') + b'\xFF'
132
132
133 @staticmethod
133 @staticmethod
134 def _txt_for_name(name):
134 def _txt_for_name(name):
135 return u'%s text file' % name
135 return u'%s text file' % name
136
136
137 def setUp(self):
137 def setUp(self):
138 nbdir = self.notebook_dir.name
138 nbdir = self.notebook_dir.name
139 self.blob = os.urandom(100)
139 self.blob = os.urandom(100)
140 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
140 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
141
141
142 for d in (self.dirs + self.hidden_dirs):
142 for d in (self.dirs + self.hidden_dirs):
143 d.replace('/', os.sep)
143 d.replace('/', os.sep)
144 if not os.path.isdir(pjoin(nbdir, d)):
144 if not os.path.isdir(pjoin(nbdir, d)):
145 os.mkdir(pjoin(nbdir, d))
145 os.mkdir(pjoin(nbdir, d))
146
146
147 for d, name in self.dirs_nbs:
147 for d, name in self.dirs_nbs:
148 d = d.replace('/', os.sep)
148 d = d.replace('/', os.sep)
149 # create a notebook
149 # create a notebook
150 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
150 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
151 encoding='utf-8') as f:
151 encoding='utf-8') as f:
152 nb = new_notebook()
152 nb = new_notebook()
153 write(nb, f, version=4)
153 write(nb, f, version=4)
154
154
155 # create a text file
155 # create a text file
156 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
156 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
157 encoding='utf-8') as f:
157 encoding='utf-8') as f:
158 f.write(self._txt_for_name(name))
158 f.write(self._txt_for_name(name))
159
159
160 # create a binary file
160 # create a binary file
161 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
161 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
162 f.write(self._blob_for_name(name))
162 f.write(self._blob_for_name(name))
163
163
164 self.api = API(self.base_url())
164 self.api = API(self.base_url())
165
165
166 def tearDown(self):
166 def tearDown(self):
167 nbdir = self.notebook_dir.name
167 nbdir = self.notebook_dir.name
168
168
169 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
169 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
170 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
170 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
171
171
172 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
172 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
173 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
173 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
174
174
175 def test_list_notebooks(self):
175 def test_list_notebooks(self):
176 nbs = notebooks_only(self.api.list().json())
176 nbs = notebooks_only(self.api.list().json())
177 self.assertEqual(len(nbs), 1)
177 self.assertEqual(len(nbs), 1)
178 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
178 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
179
179
180 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
180 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
181 self.assertEqual(len(nbs), 1)
181 self.assertEqual(len(nbs), 1)
182 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
182 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
183
183
184 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
184 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
185 self.assertEqual(len(nbs), 1)
185 self.assertEqual(len(nbs), 1)
186 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
186 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
187 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
187 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
188
188
189 nbs = notebooks_only(self.api.list('/foo/bar/').json())
189 nbs = notebooks_only(self.api.list('/foo/bar/').json())
190 self.assertEqual(len(nbs), 1)
190 self.assertEqual(len(nbs), 1)
191 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
191 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
192 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
192 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
193
193
194 nbs = notebooks_only(self.api.list('foo').json())
194 nbs = notebooks_only(self.api.list('foo').json())
195 self.assertEqual(len(nbs), 4)
195 self.assertEqual(len(nbs), 4)
196 nbnames = { normalize('NFC', n['name']) for n in nbs }
196 nbnames = { normalize('NFC', n['name']) for n in nbs }
197 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
197 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
198 expected = { normalize('NFC', name) for name in expected }
198 expected = { normalize('NFC', name) for name in expected }
199 self.assertEqual(nbnames, expected)
199 self.assertEqual(nbnames, expected)
200
200
201 nbs = notebooks_only(self.api.list('ordering').json())
201 nbs = notebooks_only(self.api.list('ordering').json())
202 nbnames = [n['name'] for n in nbs]
202 nbnames = [n['name'] for n in nbs]
203 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
203 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
204 self.assertEqual(nbnames, expected)
204 self.assertEqual(nbnames, expected)
205
205
206 def test_list_dirs(self):
206 def test_list_dirs(self):
207 print(self.api.list().json())
207 print(self.api.list().json())
208 dirs = dirs_only(self.api.list().json())
208 dirs = dirs_only(self.api.list().json())
209 dir_names = {normalize('NFC', d['name']) for d in dirs}
209 dir_names = {normalize('NFC', d['name']) for d in dirs}
210 print(dir_names)
210 print(dir_names)
211 print(self.top_level_dirs)
211 print(self.top_level_dirs)
212 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
212 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
213
213
214 def test_list_nonexistant_dir(self):
214 def test_list_nonexistant_dir(self):
215 with assert_http_error(404):
215 with assert_http_error(404):
216 self.api.list('nonexistant')
216 self.api.list('nonexistant')
217
217
218 def test_get_nb_contents(self):
218 def test_get_nb_contents(self):
219 for d, name in self.dirs_nbs:
219 for d, name in self.dirs_nbs:
220 path = url_path_join(d, name + '.ipynb')
220 path = url_path_join(d, name + '.ipynb')
221 nb = self.api.read(path).json()
221 nb = self.api.read(path).json()
222 self.assertEqual(nb['name'], u'%s.ipynb' % name)
222 self.assertEqual(nb['name'], u'%s.ipynb' % name)
223 self.assertEqual(nb['path'], path)
223 self.assertEqual(nb['path'], path)
224 self.assertEqual(nb['type'], 'notebook')
224 self.assertEqual(nb['type'], 'notebook')
225 self.assertIn('content', nb)
225 self.assertIn('content', nb)
226 self.assertEqual(nb['format'], 'json')
226 self.assertEqual(nb['format'], 'json')
227 self.assertIn('content', nb)
227 self.assertIn('content', nb)
228 self.assertIn('metadata', nb['content'])
228 self.assertIn('metadata', nb['content'])
229 self.assertIsInstance(nb['content']['metadata'], dict)
229 self.assertIsInstance(nb['content']['metadata'], dict)
230
230
231 def test_get_contents_no_such_file(self):
231 def test_get_contents_no_such_file(self):
232 # Name that doesn't exist - should be a 404
232 # Name that doesn't exist - should be a 404
233 with assert_http_error(404):
233 with assert_http_error(404):
234 self.api.read('foo/q.ipynb')
234 self.api.read('foo/q.ipynb')
235
235
236 def test_get_text_file_contents(self):
236 def test_get_text_file_contents(self):
237 for d, name in self.dirs_nbs:
237 for d, name in self.dirs_nbs:
238 path = url_path_join(d, name + '.txt')
238 path = url_path_join(d, name + '.txt')
239 model = self.api.read(path).json()
239 model = self.api.read(path).json()
240 self.assertEqual(model['name'], u'%s.txt' % name)
240 self.assertEqual(model['name'], u'%s.txt' % name)
241 self.assertEqual(model['path'], path)
241 self.assertEqual(model['path'], path)
242 self.assertIn('content', model)
242 self.assertIn('content', model)
243 self.assertEqual(model['format'], 'text')
243 self.assertEqual(model['format'], 'text')
244 self.assertEqual(model['type'], 'file')
244 self.assertEqual(model['type'], 'file')
245 self.assertEqual(model['content'], self._txt_for_name(name))
245 self.assertEqual(model['content'], self._txt_for_name(name))
246
246
247 # Name that doesn't exist - should be a 404
247 # Name that doesn't exist - should be a 404
248 with assert_http_error(404):
248 with assert_http_error(404):
249 self.api.read('foo/q.txt')
249 self.api.read('foo/q.txt')
250
250
251 # Specifying format=text should fail on a non-UTF-8 file
251 # Specifying format=text should fail on a non-UTF-8 file
252 with assert_http_error(400):
252 with assert_http_error(400):
253 self.api.read('foo/bar/baz.blob', type_='file', format='text')
253 self.api.read('foo/bar/baz.blob', type='file', format='text')
254
254
255 def test_get_binary_file_contents(self):
255 def test_get_binary_file_contents(self):
256 for d, name in self.dirs_nbs:
256 for d, name in self.dirs_nbs:
257 path = url_path_join(d, name + '.blob')
257 path = url_path_join(d, name + '.blob')
258 model = self.api.read(path).json()
258 model = self.api.read(path).json()
259 self.assertEqual(model['name'], u'%s.blob' % name)
259 self.assertEqual(model['name'], u'%s.blob' % name)
260 self.assertEqual(model['path'], path)
260 self.assertEqual(model['path'], path)
261 self.assertIn('content', model)
261 self.assertIn('content', model)
262 self.assertEqual(model['format'], 'base64')
262 self.assertEqual(model['format'], 'base64')
263 self.assertEqual(model['type'], 'file')
263 self.assertEqual(model['type'], 'file')
264 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
264 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
265 self.assertEqual(model['content'], b64_data)
265 self.assertEqual(model['content'], b64_data)
266
266
267 # Name that doesn't exist - should be a 404
267 # Name that doesn't exist - should be a 404
268 with assert_http_error(404):
268 with assert_http_error(404):
269 self.api.read('foo/q.txt')
269 self.api.read('foo/q.txt')
270
270
271 def test_get_bad_type(self):
271 def test_get_bad_type(self):
272 with assert_http_error(400):
272 with assert_http_error(400):
273 self.api.read(u'unicodΓ©', type_='file') # this is a directory
273 self.api.read(u'unicodΓ©', type='file') # this is a directory
274
274
275 with assert_http_error(400):
275 with assert_http_error(400):
276 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
276 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
277
277
278 def _check_created(self, resp, path, type='notebook'):
278 def _check_created(self, resp, path, type='notebook'):
279 self.assertEqual(resp.status_code, 201)
279 self.assertEqual(resp.status_code, 201)
280 location_header = py3compat.str_to_unicode(resp.headers['Location'])
280 location_header = py3compat.str_to_unicode(resp.headers['Location'])
281 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
281 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
282 rjson = resp.json()
282 rjson = resp.json()
283 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
283 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
284 self.assertEqual(rjson['path'], path)
284 self.assertEqual(rjson['path'], path)
285 self.assertEqual(rjson['type'], type)
285 self.assertEqual(rjson['type'], type)
286 isright = os.path.isdir if type == 'directory' else os.path.isfile
286 isright = os.path.isdir if type == 'directory' else os.path.isfile
287 assert isright(pjoin(
287 assert isright(pjoin(
288 self.notebook_dir.name,
288 self.notebook_dir.name,
289 path.replace('/', os.sep),
289 path.replace('/', os.sep),
290 ))
290 ))
291
291
292 def test_create_untitled(self):
292 def test_create_untitled(self):
293 resp = self.api.create_untitled(path=u'Γ₯ b')
293 resp = self.api.create_untitled(path=u'Γ₯ b')
294 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
294 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
295
295
296 # Second time
296 # Second time
297 resp = self.api.create_untitled(path=u'Γ₯ b')
297 resp = self.api.create_untitled(path=u'Γ₯ b')
298 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
298 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
299
299
300 # And two directories down
300 # And two directories down
301 resp = self.api.create_untitled(path='foo/bar')
301 resp = self.api.create_untitled(path='foo/bar')
302 self._check_created(resp, 'foo/bar/Untitled.ipynb')
302 self._check_created(resp, 'foo/bar/Untitled.ipynb')
303
303
304 def test_create_untitled_txt(self):
304 def test_create_untitled_txt(self):
305 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
305 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
306 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
306 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
307
307
308 resp = self.api.read(path='foo/bar/untitled.txt')
308 resp = self.api.read(path='foo/bar/untitled.txt')
309 model = resp.json()
309 model = resp.json()
310 self.assertEqual(model['type'], 'file')
310 self.assertEqual(model['type'], 'file')
311 self.assertEqual(model['format'], 'text')
311 self.assertEqual(model['format'], 'text')
312 self.assertEqual(model['content'], '')
312 self.assertEqual(model['content'], '')
313
313
314 def test_upload(self):
314 def test_upload(self):
315 nb = new_notebook()
315 nb = new_notebook()
316 nbmodel = {'content': nb, 'type': 'notebook'}
316 nbmodel = {'content': nb, 'type': 'notebook'}
317 path = u'Γ₯ b/Upload tΓ©st.ipynb'
317 path = u'Γ₯ b/Upload tΓ©st.ipynb'
318 resp = self.api.upload(path, body=json.dumps(nbmodel))
318 resp = self.api.upload(path, body=json.dumps(nbmodel))
319 self._check_created(resp, path)
319 self._check_created(resp, path)
320
320
321 def test_mkdir_untitled(self):
321 def test_mkdir_untitled(self):
322 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
322 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
323 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
323 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
324
324
325 # Second time
325 # Second time
326 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
326 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
327 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
327 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
328
328
329 # And two directories down
329 # And two directories down
330 resp = self.api.mkdir_untitled(path='foo/bar')
330 resp = self.api.mkdir_untitled(path='foo/bar')
331 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
331 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
332
332
333 def test_mkdir(self):
333 def test_mkdir(self):
334 path = u'Γ₯ b/New βˆ‚ir'
334 path = u'Γ₯ b/New βˆ‚ir'
335 resp = self.api.mkdir(path)
335 resp = self.api.mkdir(path)
336 self._check_created(resp, path, type='directory')
336 self._check_created(resp, path, type='directory')
337
337
338 def test_mkdir_hidden_400(self):
338 def test_mkdir_hidden_400(self):
339 with assert_http_error(400):
339 with assert_http_error(400):
340 resp = self.api.mkdir(u'Γ₯ b/.hidden')
340 resp = self.api.mkdir(u'Γ₯ b/.hidden')
341
341
342 def test_upload_txt(self):
342 def test_upload_txt(self):
343 body = u'ΓΌnicode tΓ©xt'
343 body = u'ΓΌnicode tΓ©xt'
344 model = {
344 model = {
345 'content' : body,
345 'content' : body,
346 'format' : 'text',
346 'format' : 'text',
347 'type' : 'file',
347 'type' : 'file',
348 }
348 }
349 path = u'Γ₯ b/Upload tΓ©st.txt'
349 path = u'Γ₯ b/Upload tΓ©st.txt'
350 resp = self.api.upload(path, body=json.dumps(model))
350 resp = self.api.upload(path, body=json.dumps(model))
351
351
352 # check roundtrip
352 # check roundtrip
353 resp = self.api.read(path)
353 resp = self.api.read(path)
354 model = resp.json()
354 model = resp.json()
355 self.assertEqual(model['type'], 'file')
355 self.assertEqual(model['type'], 'file')
356 self.assertEqual(model['format'], 'text')
356 self.assertEqual(model['format'], 'text')
357 self.assertEqual(model['content'], body)
357 self.assertEqual(model['content'], body)
358
358
359 def test_upload_b64(self):
359 def test_upload_b64(self):
360 body = b'\xFFblob'
360 body = b'\xFFblob'
361 b64body = base64.encodestring(body).decode('ascii')
361 b64body = base64.encodestring(body).decode('ascii')
362 model = {
362 model = {
363 'content' : b64body,
363 'content' : b64body,
364 'format' : 'base64',
364 'format' : 'base64',
365 'type' : 'file',
365 'type' : 'file',
366 }
366 }
367 path = u'Γ₯ b/Upload tΓ©st.blob'
367 path = u'Γ₯ b/Upload tΓ©st.blob'
368 resp = self.api.upload(path, body=json.dumps(model))
368 resp = self.api.upload(path, body=json.dumps(model))
369
369
370 # check roundtrip
370 # check roundtrip
371 resp = self.api.read(path)
371 resp = self.api.read(path)
372 model = resp.json()
372 model = resp.json()
373 self.assertEqual(model['type'], 'file')
373 self.assertEqual(model['type'], 'file')
374 self.assertEqual(model['path'], path)
374 self.assertEqual(model['path'], path)
375 self.assertEqual(model['format'], 'base64')
375 self.assertEqual(model['format'], 'base64')
376 decoded = base64.decodestring(model['content'].encode('ascii'))
376 decoded = base64.decodestring(model['content'].encode('ascii'))
377 self.assertEqual(decoded, body)
377 self.assertEqual(decoded, body)
378
378
379 def test_upload_v2(self):
379 def test_upload_v2(self):
380 nb = v2.new_notebook()
380 nb = v2.new_notebook()
381 ws = v2.new_worksheet()
381 ws = v2.new_worksheet()
382 nb.worksheets.append(ws)
382 nb.worksheets.append(ws)
383 ws.cells.append(v2.new_code_cell(input='print("hi")'))
383 ws.cells.append(v2.new_code_cell(input='print("hi")'))
384 nbmodel = {'content': nb, 'type': 'notebook'}
384 nbmodel = {'content': nb, 'type': 'notebook'}
385 path = u'Γ₯ b/Upload tΓ©st.ipynb'
385 path = u'Γ₯ b/Upload tΓ©st.ipynb'
386 resp = self.api.upload(path, body=json.dumps(nbmodel))
386 resp = self.api.upload(path, body=json.dumps(nbmodel))
387 self._check_created(resp, path)
387 self._check_created(resp, path)
388 resp = self.api.read(path)
388 resp = self.api.read(path)
389 data = resp.json()
389 data = resp.json()
390 self.assertEqual(data['content']['nbformat'], 4)
390 self.assertEqual(data['content']['nbformat'], 4)
391
391
392 def test_copy(self):
392 def test_copy(self):
393 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
393 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
394 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
394 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
395
395
396 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
396 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
397 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
397 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
398
398
399 def test_copy_copy(self):
399 def test_copy_copy(self):
400 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
400 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
401 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
401 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
402
402
403 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
403 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
404 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
404 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
405
405
406 def test_copy_path(self):
406 def test_copy_path(self):
407 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
407 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
408 self._check_created(resp, u'Γ₯ b/a.ipynb')
408 self._check_created(resp, u'Γ₯ b/a.ipynb')
409
409
410 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
410 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
411 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
411 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
412
412
413 def test_copy_put_400(self):
413 def test_copy_put_400(self):
414 with assert_http_error(400):
414 with assert_http_error(400):
415 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
415 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
416
416
417 def test_copy_dir_400(self):
417 def test_copy_dir_400(self):
418 # can't copy directories
418 # can't copy directories
419 with assert_http_error(400):
419 with assert_http_error(400):
420 resp = self.api.copy(u'Γ₯ b', u'foo')
420 resp = self.api.copy(u'Γ₯ b', u'foo')
421
421
422 def test_delete(self):
422 def test_delete(self):
423 for d, name in self.dirs_nbs:
423 for d, name in self.dirs_nbs:
424 print('%r, %r' % (d, name))
424 print('%r, %r' % (d, name))
425 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
425 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
426 self.assertEqual(resp.status_code, 204)
426 self.assertEqual(resp.status_code, 204)
427
427
428 for d in self.dirs + ['/']:
428 for d in self.dirs + ['/']:
429 nbs = notebooks_only(self.api.list(d).json())
429 nbs = notebooks_only(self.api.list(d).json())
430 print('------')
430 print('------')
431 print(d)
431 print(d)
432 print(nbs)
432 print(nbs)
433 self.assertEqual(nbs, [])
433 self.assertEqual(nbs, [])
434
434
435 def test_delete_dirs(self):
435 def test_delete_dirs(self):
436 # depth-first delete everything, so we don't try to delete empty directories
436 # depth-first delete everything, so we don't try to delete empty directories
437 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
437 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
438 listing = self.api.list(name).json()['content']
438 listing = self.api.list(name).json()['content']
439 for model in listing:
439 for model in listing:
440 self.api.delete(model['path'])
440 self.api.delete(model['path'])
441 listing = self.api.list('/').json()['content']
441 listing = self.api.list('/').json()['content']
442 self.assertEqual(listing, [])
442 self.assertEqual(listing, [])
443
443
444 def test_delete_non_empty_dir(self):
444 def test_delete_non_empty_dir(self):
445 """delete non-empty dir raises 400"""
445 """delete non-empty dir raises 400"""
446 with assert_http_error(400):
446 with assert_http_error(400):
447 self.api.delete(u'Γ₯ b')
447 self.api.delete(u'Γ₯ b')
448
448
449 def test_rename(self):
449 def test_rename(self):
450 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
450 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
451 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
451 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
452 self.assertEqual(resp.json()['name'], 'z.ipynb')
452 self.assertEqual(resp.json()['name'], 'z.ipynb')
453 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
453 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
454 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
454 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
455
455
456 nbs = notebooks_only(self.api.list('foo').json())
456 nbs = notebooks_only(self.api.list('foo').json())
457 nbnames = set(n['name'] for n in nbs)
457 nbnames = set(n['name'] for n in nbs)
458 self.assertIn('z.ipynb', nbnames)
458 self.assertIn('z.ipynb', nbnames)
459 self.assertNotIn('a.ipynb', nbnames)
459 self.assertNotIn('a.ipynb', nbnames)
460
460
461 def test_rename_existing(self):
461 def test_rename_existing(self):
462 with assert_http_error(409):
462 with assert_http_error(409):
463 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
463 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
464
464
465 def test_save(self):
465 def test_save(self):
466 resp = self.api.read('foo/a.ipynb')
466 resp = self.api.read('foo/a.ipynb')
467 nbcontent = json.loads(resp.text)['content']
467 nbcontent = json.loads(resp.text)['content']
468 nb = from_dict(nbcontent)
468 nb = from_dict(nbcontent)
469 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
469 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
470
470
471 nbmodel= {'content': nb, 'type': 'notebook'}
471 nbmodel= {'content': nb, 'type': 'notebook'}
472 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
472 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
473
473
474 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
474 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
475 with io.open(nbfile, 'r', encoding='utf-8') as f:
475 with io.open(nbfile, 'r', encoding='utf-8') as f:
476 newnb = read(f, as_version=4)
476 newnb = read(f, as_version=4)
477 self.assertEqual(newnb.cells[0].source,
477 self.assertEqual(newnb.cells[0].source,
478 u'Created by test Β³')
478 u'Created by test Β³')
479 nbcontent = self.api.read('foo/a.ipynb').json()['content']
479 nbcontent = self.api.read('foo/a.ipynb').json()['content']
480 newnb = from_dict(nbcontent)
480 newnb = from_dict(nbcontent)
481 self.assertEqual(newnb.cells[0].source,
481 self.assertEqual(newnb.cells[0].source,
482 u'Created by test Β³')
482 u'Created by test Β³')
483
483
484
484
485 def test_checkpoints(self):
485 def test_checkpoints(self):
486 resp = self.api.read('foo/a.ipynb')
486 resp = self.api.read('foo/a.ipynb')
487 r = self.api.new_checkpoint('foo/a.ipynb')
487 r = self.api.new_checkpoint('foo/a.ipynb')
488 self.assertEqual(r.status_code, 201)
488 self.assertEqual(r.status_code, 201)
489 cp1 = r.json()
489 cp1 = r.json()
490 self.assertEqual(set(cp1), {'id', 'last_modified'})
490 self.assertEqual(set(cp1), {'id', 'last_modified'})
491 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
491 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
492
492
493 # Modify it
493 # Modify it
494 nbcontent = json.loads(resp.text)['content']
494 nbcontent = json.loads(resp.text)['content']
495 nb = from_dict(nbcontent)
495 nb = from_dict(nbcontent)
496 hcell = new_markdown_cell('Created by test')
496 hcell = new_markdown_cell('Created by test')
497 nb.cells.append(hcell)
497 nb.cells.append(hcell)
498 # Save
498 # Save
499 nbmodel= {'content': nb, 'type': 'notebook'}
499 nbmodel= {'content': nb, 'type': 'notebook'}
500 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
500 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
501
501
502 # List checkpoints
502 # List checkpoints
503 cps = self.api.get_checkpoints('foo/a.ipynb').json()
503 cps = self.api.get_checkpoints('foo/a.ipynb').json()
504 self.assertEqual(cps, [cp1])
504 self.assertEqual(cps, [cp1])
505
505
506 nbcontent = self.api.read('foo/a.ipynb').json()['content']
506 nbcontent = self.api.read('foo/a.ipynb').json()['content']
507 nb = from_dict(nbcontent)
507 nb = from_dict(nbcontent)
508 self.assertEqual(nb.cells[0].source, 'Created by test')
508 self.assertEqual(nb.cells[0].source, 'Created by test')
509
509
510 # Restore cp1
510 # Restore cp1
511 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
511 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
512 self.assertEqual(r.status_code, 204)
512 self.assertEqual(r.status_code, 204)
513 nbcontent = self.api.read('foo/a.ipynb').json()['content']
513 nbcontent = self.api.read('foo/a.ipynb').json()['content']
514 nb = from_dict(nbcontent)
514 nb = from_dict(nbcontent)
515 self.assertEqual(nb.cells, [])
515 self.assertEqual(nb.cells, [])
516
516
517 # Delete cp1
517 # Delete cp1
518 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
518 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
519 self.assertEqual(r.status_code, 204)
519 self.assertEqual(r.status_code, 204)
520 cps = self.api.get_checkpoints('foo/a.ipynb').json()
520 cps = self.api.get_checkpoints('foo/a.ipynb').json()
521 self.assertEqual(cps, [])
521 self.assertEqual(cps, [])
@@ -1,369 +1,369 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import logging
5 import logging
6 import os
6 import os
7
7
8 from tornado.web import HTTPError
8 from tornado.web import HTTPError
9 from unittest import TestCase
9 from unittest import TestCase
10 from tempfile import NamedTemporaryFile
10 from tempfile import NamedTemporaryFile
11
11
12 from IPython.nbformat import v4 as nbformat
12 from IPython.nbformat import v4 as nbformat
13
13
14 from IPython.utils.tempdir import TemporaryDirectory
14 from IPython.utils.tempdir import TemporaryDirectory
15 from IPython.utils.traitlets import TraitError
15 from IPython.utils.traitlets import TraitError
16 from IPython.html.utils import url_path_join
16 from IPython.html.utils import url_path_join
17 from IPython.testing import decorators as dec
17 from IPython.testing import decorators as dec
18
18
19 from ..filemanager import FileContentsManager
19 from ..filemanager import FileContentsManager
20 from ..manager import ContentsManager
20 from ..manager import ContentsManager
21
21
22
22
23 class TestFileContentsManager(TestCase):
23 class TestFileContentsManager(TestCase):
24
24
25 def test_root_dir(self):
25 def test_root_dir(self):
26 with TemporaryDirectory() as td:
26 with TemporaryDirectory() as td:
27 fm = FileContentsManager(root_dir=td)
27 fm = FileContentsManager(root_dir=td)
28 self.assertEqual(fm.root_dir, td)
28 self.assertEqual(fm.root_dir, td)
29
29
30 def test_missing_root_dir(self):
30 def test_missing_root_dir(self):
31 with TemporaryDirectory() as td:
31 with TemporaryDirectory() as td:
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34
34
35 def test_invalid_root_dir(self):
35 def test_invalid_root_dir(self):
36 with NamedTemporaryFile() as tf:
36 with NamedTemporaryFile() as tf:
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38
38
39 def test_get_os_path(self):
39 def test_get_os_path(self):
40 # full filesystem path should be returned with correct operating system
40 # full filesystem path should be returned with correct operating system
41 # separators.
41 # separators.
42 with TemporaryDirectory() as td:
42 with TemporaryDirectory() as td:
43 root = td
43 root = td
44 fm = FileContentsManager(root_dir=root)
44 fm = FileContentsManager(root_dir=root)
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 self.assertEqual(path, fs_path)
48 self.assertEqual(path, fs_path)
49
49
50 fm = FileContentsManager(root_dir=root)
50 fm = FileContentsManager(root_dir=root)
51 path = fm._get_os_path('test.ipynb')
51 path = fm._get_os_path('test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 self.assertEqual(path, fs_path)
53 self.assertEqual(path, fs_path)
54
54
55 fm = FileContentsManager(root_dir=root)
55 fm = FileContentsManager(root_dir=root)
56 path = fm._get_os_path('////test.ipynb')
56 path = fm._get_os_path('////test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 self.assertEqual(path, fs_path)
58 self.assertEqual(path, fs_path)
59
59
60 def test_checkpoint_subdir(self):
60 def test_checkpoint_subdir(self):
61 subd = u'sub βˆ‚ir'
61 subd = u'sub βˆ‚ir'
62 cp_name = 'test-cp.ipynb'
62 cp_name = 'test-cp.ipynb'
63 with TemporaryDirectory() as td:
63 with TemporaryDirectory() as td:
64 root = td
64 root = td
65 os.mkdir(os.path.join(td, subd))
65 os.mkdir(os.path.join(td, subd))
66 fm = FileContentsManager(root_dir=root)
66 fm = FileContentsManager(root_dir=root)
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
69 self.assertNotEqual(cp_dir, cp_subdir)
69 self.assertNotEqual(cp_dir, cp_subdir)
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72
72
73
73
74 class TestContentsManager(TestCase):
74 class TestContentsManager(TestCase):
75
75
76 def setUp(self):
76 def setUp(self):
77 self._temp_dir = TemporaryDirectory()
77 self._temp_dir = TemporaryDirectory()
78 self.td = self._temp_dir.name
78 self.td = self._temp_dir.name
79 self.contents_manager = FileContentsManager(
79 self.contents_manager = FileContentsManager(
80 root_dir=self.td,
80 root_dir=self.td,
81 log=logging.getLogger()
81 log=logging.getLogger()
82 )
82 )
83
83
84 def tearDown(self):
84 def tearDown(self):
85 self._temp_dir.cleanup()
85 self._temp_dir.cleanup()
86
86
87 def make_dir(self, abs_path, rel_path):
87 def make_dir(self, abs_path, rel_path):
88 """make subdirectory, rel_path is the relative path
88 """make subdirectory, rel_path is the relative path
89 to that directory from the location where the server started"""
89 to that directory from the location where the server started"""
90 os_path = os.path.join(abs_path, rel_path)
90 os_path = os.path.join(abs_path, rel_path)
91 try:
91 try:
92 os.makedirs(os_path)
92 os.makedirs(os_path)
93 except OSError:
93 except OSError:
94 print("Directory already exists: %r" % os_path)
94 print("Directory already exists: %r" % os_path)
95 return os_path
95 return os_path
96
96
97 def add_code_cell(self, nb):
97 def add_code_cell(self, nb):
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 nb.cells.append(cell)
100 nb.cells.append(cell)
101
101
102 def new_notebook(self):
102 def new_notebook(self):
103 cm = self.contents_manager
103 cm = self.contents_manager
104 model = cm.new_untitled(type='notebook')
104 model = cm.new_untitled(type='notebook')
105 name = model['name']
105 name = model['name']
106 path = model['path']
106 path = model['path']
107
107
108 full_model = cm.get(path)
108 full_model = cm.get(path)
109 nb = full_model['content']
109 nb = full_model['content']
110 self.add_code_cell(nb)
110 self.add_code_cell(nb)
111
111
112 cm.save(full_model, path)
112 cm.save(full_model, path)
113 return nb, name, path
113 return nb, name, path
114
114
115 def test_new_untitled(self):
115 def test_new_untitled(self):
116 cm = self.contents_manager
116 cm = self.contents_manager
117 # Test in root directory
117 # Test in root directory
118 model = cm.new_untitled(type='notebook')
118 model = cm.new_untitled(type='notebook')
119 assert isinstance(model, dict)
119 assert isinstance(model, dict)
120 self.assertIn('name', model)
120 self.assertIn('name', model)
121 self.assertIn('path', model)
121 self.assertIn('path', model)
122 self.assertIn('type', model)
122 self.assertIn('type', model)
123 self.assertEqual(model['type'], 'notebook')
123 self.assertEqual(model['type'], 'notebook')
124 self.assertEqual(model['name'], 'Untitled.ipynb')
124 self.assertEqual(model['name'], 'Untitled.ipynb')
125 self.assertEqual(model['path'], 'Untitled.ipynb')
125 self.assertEqual(model['path'], 'Untitled.ipynb')
126
126
127 # Test in sub-directory
127 # Test in sub-directory
128 model = cm.new_untitled(type='directory')
128 model = cm.new_untitled(type='directory')
129 assert isinstance(model, dict)
129 assert isinstance(model, dict)
130 self.assertIn('name', model)
130 self.assertIn('name', model)
131 self.assertIn('path', model)
131 self.assertIn('path', model)
132 self.assertIn('type', model)
132 self.assertIn('type', model)
133 self.assertEqual(model['type'], 'directory')
133 self.assertEqual(model['type'], 'directory')
134 self.assertEqual(model['name'], 'Untitled Folder')
134 self.assertEqual(model['name'], 'Untitled Folder')
135 self.assertEqual(model['path'], 'Untitled Folder')
135 self.assertEqual(model['path'], 'Untitled Folder')
136 sub_dir = model['path']
136 sub_dir = model['path']
137
137
138 model = cm.new_untitled(path=sub_dir)
138 model = cm.new_untitled(path=sub_dir)
139 assert isinstance(model, dict)
139 assert isinstance(model, dict)
140 self.assertIn('name', model)
140 self.assertIn('name', model)
141 self.assertIn('path', model)
141 self.assertIn('path', model)
142 self.assertIn('type', model)
142 self.assertIn('type', model)
143 self.assertEqual(model['type'], 'file')
143 self.assertEqual(model['type'], 'file')
144 self.assertEqual(model['name'], 'untitled')
144 self.assertEqual(model['name'], 'untitled')
145 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
145 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
146
146
147 def test_get(self):
147 def test_get(self):
148 cm = self.contents_manager
148 cm = self.contents_manager
149 # Create a notebook
149 # Create a notebook
150 model = cm.new_untitled(type='notebook')
150 model = cm.new_untitled(type='notebook')
151 name = model['name']
151 name = model['name']
152 path = model['path']
152 path = model['path']
153
153
154 # Check that we 'get' on the notebook we just created
154 # Check that we 'get' on the notebook we just created
155 model2 = cm.get(path)
155 model2 = cm.get(path)
156 assert isinstance(model2, dict)
156 assert isinstance(model2, dict)
157 self.assertIn('name', model2)
157 self.assertIn('name', model2)
158 self.assertIn('path', model2)
158 self.assertIn('path', model2)
159 self.assertEqual(model['name'], name)
159 self.assertEqual(model['name'], name)
160 self.assertEqual(model['path'], path)
160 self.assertEqual(model['path'], path)
161
161
162 nb_as_file = cm.get(path, content=True, type_='file')
162 nb_as_file = cm.get(path, content=True, type='file')
163 self.assertEqual(nb_as_file['path'], path)
163 self.assertEqual(nb_as_file['path'], path)
164 self.assertEqual(nb_as_file['type'], 'file')
164 self.assertEqual(nb_as_file['type'], 'file')
165 self.assertEqual(nb_as_file['format'], 'text')
165 self.assertEqual(nb_as_file['format'], 'text')
166 self.assertNotIsInstance(nb_as_file['content'], dict)
166 self.assertNotIsInstance(nb_as_file['content'], dict)
167
167
168 nb_as_bin_file = cm.get(path, content=True, type_='file', format='base64')
168 nb_as_bin_file = cm.get(path, content=True, type='file', format='base64')
169 self.assertEqual(nb_as_bin_file['format'], 'base64')
169 self.assertEqual(nb_as_bin_file['format'], 'base64')
170
170
171 # Test in sub-directory
171 # Test in sub-directory
172 sub_dir = '/foo/'
172 sub_dir = '/foo/'
173 self.make_dir(cm.root_dir, 'foo')
173 self.make_dir(cm.root_dir, 'foo')
174 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
174 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
175 model2 = cm.get(sub_dir + name)
175 model2 = cm.get(sub_dir + name)
176 assert isinstance(model2, dict)
176 assert isinstance(model2, dict)
177 self.assertIn('name', model2)
177 self.assertIn('name', model2)
178 self.assertIn('path', model2)
178 self.assertIn('path', model2)
179 self.assertIn('content', model2)
179 self.assertIn('content', model2)
180 self.assertEqual(model2['name'], 'Untitled.ipynb')
180 self.assertEqual(model2['name'], 'Untitled.ipynb')
181 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
181 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
182
182
183 # Test getting directory model
183 # Test getting directory model
184 dirmodel = cm.get('foo')
184 dirmodel = cm.get('foo')
185 self.assertEqual(dirmodel['type'], 'directory')
185 self.assertEqual(dirmodel['type'], 'directory')
186
186
187 with self.assertRaises(HTTPError):
187 with self.assertRaises(HTTPError):
188 cm.get('foo', type_='file')
188 cm.get('foo', type='file')
189
189
190
190
191 @dec.skip_win32
191 @dec.skip_win32
192 def test_bad_symlink(self):
192 def test_bad_symlink(self):
193 cm = self.contents_manager
193 cm = self.contents_manager
194 path = 'test bad symlink'
194 path = 'test bad symlink'
195 os_path = self.make_dir(cm.root_dir, path)
195 os_path = self.make_dir(cm.root_dir, path)
196
196
197 file_model = cm.new_untitled(path=path, ext='.txt')
197 file_model = cm.new_untitled(path=path, ext='.txt')
198
198
199 # create a broken symlink
199 # create a broken symlink
200 os.symlink("target", os.path.join(os_path, "bad symlink"))
200 os.symlink("target", os.path.join(os_path, "bad symlink"))
201 model = cm.get(path)
201 model = cm.get(path)
202 self.assertEqual(model['content'], [file_model])
202 self.assertEqual(model['content'], [file_model])
203
203
204 @dec.skip_win32
204 @dec.skip_win32
205 def test_good_symlink(self):
205 def test_good_symlink(self):
206 cm = self.contents_manager
206 cm = self.contents_manager
207 parent = 'test good symlink'
207 parent = 'test good symlink'
208 name = 'good symlink'
208 name = 'good symlink'
209 path = '{0}/{1}'.format(parent, name)
209 path = '{0}/{1}'.format(parent, name)
210 os_path = self.make_dir(cm.root_dir, parent)
210 os_path = self.make_dir(cm.root_dir, parent)
211
211
212 file_model = cm.new(path=parent + '/zfoo.txt')
212 file_model = cm.new(path=parent + '/zfoo.txt')
213
213
214 # create a good symlink
214 # create a good symlink
215 os.symlink(file_model['name'], os.path.join(os_path, name))
215 os.symlink(file_model['name'], os.path.join(os_path, name))
216 symlink_model = cm.get(path, content=False)
216 symlink_model = cm.get(path, content=False)
217 dir_model = cm.get(parent)
217 dir_model = cm.get(parent)
218 self.assertEqual(
218 self.assertEqual(
219 sorted(dir_model['content'], key=lambda x: x['name']),
219 sorted(dir_model['content'], key=lambda x: x['name']),
220 [symlink_model, file_model],
220 [symlink_model, file_model],
221 )
221 )
222
222
223 def test_update(self):
223 def test_update(self):
224 cm = self.contents_manager
224 cm = self.contents_manager
225 # Create a notebook
225 # Create a notebook
226 model = cm.new_untitled(type='notebook')
226 model = cm.new_untitled(type='notebook')
227 name = model['name']
227 name = model['name']
228 path = model['path']
228 path = model['path']
229
229
230 # Change the name in the model for rename
230 # Change the name in the model for rename
231 model['path'] = 'test.ipynb'
231 model['path'] = 'test.ipynb'
232 model = cm.update(model, path)
232 model = cm.update(model, path)
233 assert isinstance(model, dict)
233 assert isinstance(model, dict)
234 self.assertIn('name', model)
234 self.assertIn('name', model)
235 self.assertIn('path', model)
235 self.assertIn('path', model)
236 self.assertEqual(model['name'], 'test.ipynb')
236 self.assertEqual(model['name'], 'test.ipynb')
237
237
238 # Make sure the old name is gone
238 # Make sure the old name is gone
239 self.assertRaises(HTTPError, cm.get, path)
239 self.assertRaises(HTTPError, cm.get, path)
240
240
241 # Test in sub-directory
241 # Test in sub-directory
242 # Create a directory and notebook in that directory
242 # Create a directory and notebook in that directory
243 sub_dir = '/foo/'
243 sub_dir = '/foo/'
244 self.make_dir(cm.root_dir, 'foo')
244 self.make_dir(cm.root_dir, 'foo')
245 model = cm.new_untitled(path=sub_dir, type='notebook')
245 model = cm.new_untitled(path=sub_dir, type='notebook')
246 name = model['name']
246 name = model['name']
247 path = model['path']
247 path = model['path']
248
248
249 # Change the name in the model for rename
249 # Change the name in the model for rename
250 d = path.rsplit('/', 1)[0]
250 d = path.rsplit('/', 1)[0]
251 new_path = model['path'] = d + '/test_in_sub.ipynb'
251 new_path = model['path'] = d + '/test_in_sub.ipynb'
252 model = cm.update(model, path)
252 model = cm.update(model, path)
253 assert isinstance(model, dict)
253 assert isinstance(model, dict)
254 self.assertIn('name', model)
254 self.assertIn('name', model)
255 self.assertIn('path', model)
255 self.assertIn('path', model)
256 self.assertEqual(model['name'], 'test_in_sub.ipynb')
256 self.assertEqual(model['name'], 'test_in_sub.ipynb')
257 self.assertEqual(model['path'], new_path)
257 self.assertEqual(model['path'], new_path)
258
258
259 # Make sure the old name is gone
259 # Make sure the old name is gone
260 self.assertRaises(HTTPError, cm.get, path)
260 self.assertRaises(HTTPError, cm.get, path)
261
261
262 def test_save(self):
262 def test_save(self):
263 cm = self.contents_manager
263 cm = self.contents_manager
264 # Create a notebook
264 # Create a notebook
265 model = cm.new_untitled(type='notebook')
265 model = cm.new_untitled(type='notebook')
266 name = model['name']
266 name = model['name']
267 path = model['path']
267 path = model['path']
268
268
269 # Get the model with 'content'
269 # Get the model with 'content'
270 full_model = cm.get(path)
270 full_model = cm.get(path)
271
271
272 # Save the notebook
272 # Save the notebook
273 model = cm.save(full_model, path)
273 model = cm.save(full_model, path)
274 assert isinstance(model, dict)
274 assert isinstance(model, dict)
275 self.assertIn('name', model)
275 self.assertIn('name', model)
276 self.assertIn('path', model)
276 self.assertIn('path', model)
277 self.assertEqual(model['name'], name)
277 self.assertEqual(model['name'], name)
278 self.assertEqual(model['path'], path)
278 self.assertEqual(model['path'], path)
279
279
280 # Test in sub-directory
280 # Test in sub-directory
281 # Create a directory and notebook in that directory
281 # Create a directory and notebook in that directory
282 sub_dir = '/foo/'
282 sub_dir = '/foo/'
283 self.make_dir(cm.root_dir, 'foo')
283 self.make_dir(cm.root_dir, 'foo')
284 model = cm.new_untitled(path=sub_dir, type='notebook')
284 model = cm.new_untitled(path=sub_dir, type='notebook')
285 name = model['name']
285 name = model['name']
286 path = model['path']
286 path = model['path']
287 model = cm.get(path)
287 model = cm.get(path)
288
288
289 # Change the name in the model for rename
289 # Change the name in the model for rename
290 model = cm.save(model, path)
290 model = cm.save(model, path)
291 assert isinstance(model, dict)
291 assert isinstance(model, dict)
292 self.assertIn('name', model)
292 self.assertIn('name', model)
293 self.assertIn('path', model)
293 self.assertIn('path', model)
294 self.assertEqual(model['name'], 'Untitled.ipynb')
294 self.assertEqual(model['name'], 'Untitled.ipynb')
295 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
295 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
296
296
297 def test_delete(self):
297 def test_delete(self):
298 cm = self.contents_manager
298 cm = self.contents_manager
299 # Create a notebook
299 # Create a notebook
300 nb, name, path = self.new_notebook()
300 nb, name, path = self.new_notebook()
301
301
302 # Delete the notebook
302 # Delete the notebook
303 cm.delete(path)
303 cm.delete(path)
304
304
305 # Check that a 'get' on the deleted notebook raises and error
305 # Check that a 'get' on the deleted notebook raises and error
306 self.assertRaises(HTTPError, cm.get, path)
306 self.assertRaises(HTTPError, cm.get, path)
307
307
308 def test_copy(self):
308 def test_copy(self):
309 cm = self.contents_manager
309 cm = self.contents_manager
310 parent = u'Γ₯ b'
310 parent = u'Γ₯ b'
311 name = u'nb √.ipynb'
311 name = u'nb √.ipynb'
312 path = u'{0}/{1}'.format(parent, name)
312 path = u'{0}/{1}'.format(parent, name)
313 os.mkdir(os.path.join(cm.root_dir, parent))
313 os.mkdir(os.path.join(cm.root_dir, parent))
314 orig = cm.new(path=path)
314 orig = cm.new(path=path)
315
315
316 # copy with unspecified name
316 # copy with unspecified name
317 copy = cm.copy(path)
317 copy = cm.copy(path)
318 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
318 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
319
319
320 # copy with specified name
320 # copy with specified name
321 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
321 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
322 self.assertEqual(copy2['name'], u'copy 2.ipynb')
322 self.assertEqual(copy2['name'], u'copy 2.ipynb')
323 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
323 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
324 # copy with specified path
324 # copy with specified path
325 copy2 = cm.copy(path, u'/')
325 copy2 = cm.copy(path, u'/')
326 self.assertEqual(copy2['name'], name)
326 self.assertEqual(copy2['name'], name)
327 self.assertEqual(copy2['path'], name)
327 self.assertEqual(copy2['path'], name)
328
328
329 def test_trust_notebook(self):
329 def test_trust_notebook(self):
330 cm = self.contents_manager
330 cm = self.contents_manager
331 nb, name, path = self.new_notebook()
331 nb, name, path = self.new_notebook()
332
332
333 untrusted = cm.get(path)['content']
333 untrusted = cm.get(path)['content']
334 assert not cm.notary.check_cells(untrusted)
334 assert not cm.notary.check_cells(untrusted)
335
335
336 # print(untrusted)
336 # print(untrusted)
337 cm.trust_notebook(path)
337 cm.trust_notebook(path)
338 trusted = cm.get(path)['content']
338 trusted = cm.get(path)['content']
339 # print(trusted)
339 # print(trusted)
340 assert cm.notary.check_cells(trusted)
340 assert cm.notary.check_cells(trusted)
341
341
342 def test_mark_trusted_cells(self):
342 def test_mark_trusted_cells(self):
343 cm = self.contents_manager
343 cm = self.contents_manager
344 nb, name, path = self.new_notebook()
344 nb, name, path = self.new_notebook()
345
345
346 cm.mark_trusted_cells(nb, path)
346 cm.mark_trusted_cells(nb, path)
347 for cell in nb.cells:
347 for cell in nb.cells:
348 if cell.cell_type == 'code':
348 if cell.cell_type == 'code':
349 assert not cell.metadata.trusted
349 assert not cell.metadata.trusted
350
350
351 cm.trust_notebook(path)
351 cm.trust_notebook(path)
352 nb = cm.get(path)['content']
352 nb = cm.get(path)['content']
353 for cell in nb.cells:
353 for cell in nb.cells:
354 if cell.cell_type == 'code':
354 if cell.cell_type == 'code':
355 assert cell.metadata.trusted
355 assert cell.metadata.trusted
356
356
357 def test_check_and_sign(self):
357 def test_check_and_sign(self):
358 cm = self.contents_manager
358 cm = self.contents_manager
359 nb, name, path = self.new_notebook()
359 nb, name, path = self.new_notebook()
360
360
361 cm.mark_trusted_cells(nb, path)
361 cm.mark_trusted_cells(nb, path)
362 cm.check_and_sign(nb, path)
362 cm.check_and_sign(nb, path)
363 assert not cm.notary.check_signature(nb)
363 assert not cm.notary.check_signature(nb)
364
364
365 cm.trust_notebook(path)
365 cm.trust_notebook(path)
366 nb = cm.get(path)['content']
366 nb = cm.get(path)['content']
367 cm.mark_trusted_cells(nb, path)
367 cm.mark_trusted_cells(nb, path)
368 cm.check_and_sign(nb, path)
368 cm.check_and_sign(nb, path)
369 assert cm.notary.check_signature(nb)
369 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now