##// END OF EJS Templates
Add type parameter for contents GET requests
Thomas Kluyver -
Show More
@@ -1,534 +1,545 b''
1 """A contents manager that uses the local file system for storage."""
1 """A contents manager that uses the local file system for storage."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6 import base64
7 import io
7 import io
8 import os
8 import os
9 import glob
9 import glob
10 import shutil
10 import shutil
11
11
12 from tornado import web
12 from tornado import web
13
13
14 from .manager import ContentsManager
14 from .manager import ContentsManager
15 from IPython import nbformat
15 from IPython import nbformat
16 from IPython.utils.io import atomic_writing
16 from IPython.utils.io import atomic_writing
17 from IPython.utils.path import ensure_dir_exists
17 from IPython.utils.path import ensure_dir_exists
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
19 from IPython.utils.py3compat import getcwd
19 from IPython.utils.py3compat import getcwd
20 from IPython.utils import tz
20 from IPython.utils import tz
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
22
22
23
23
24 class FileContentsManager(ContentsManager):
24 class FileContentsManager(ContentsManager):
25
25
26 root_dir = Unicode(getcwd(), config=True)
26 root_dir = Unicode(getcwd(), config=True)
27
27
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
29 def _save_script_changed(self):
29 def _save_script_changed(self):
30 self.log.warn("""
30 self.log.warn("""
31 Automatically saving notebooks as scripts has been removed.
31 Automatically saving notebooks as scripts has been removed.
32 Use `ipython nbconvert --to python [notebook]` instead.
32 Use `ipython nbconvert --to python [notebook]` instead.
33 """)
33 """)
34
34
35 def _root_dir_changed(self, name, old, new):
35 def _root_dir_changed(self, name, old, new):
36 """Do a bit of validation of the root_dir."""
36 """Do a bit of validation of the root_dir."""
37 if not os.path.isabs(new):
37 if not os.path.isabs(new):
38 # If we receive a non-absolute path, make it absolute.
38 # If we receive a non-absolute path, make it absolute.
39 self.root_dir = os.path.abspath(new)
39 self.root_dir = os.path.abspath(new)
40 return
40 return
41 if not os.path.isdir(new):
41 if not os.path.isdir(new):
42 raise TraitError("%r is not a directory" % new)
42 raise TraitError("%r is not a directory" % new)
43
43
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
45 help="""The directory name in which to keep file checkpoints
45 help="""The directory name in which to keep file checkpoints
46
46
47 This is a path relative to the file's own directory.
47 This is a path relative to the file's own directory.
48
48
49 By default, it is .ipynb_checkpoints
49 By default, it is .ipynb_checkpoints
50 """
50 """
51 )
51 )
52
52
53 def _copy(self, src, dest):
53 def _copy(self, src, dest):
54 """copy src to dest
54 """copy src to dest
55
55
56 like shutil.copy2, but log errors in copystat
56 like shutil.copy2, but log errors in copystat
57 """
57 """
58 shutil.copyfile(src, dest)
58 shutil.copyfile(src, dest)
59 try:
59 try:
60 shutil.copystat(src, dest)
60 shutil.copystat(src, dest)
61 except OSError as e:
61 except OSError as e:
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
63
63
64 def _get_os_path(self, path):
64 def _get_os_path(self, path):
65 """Given an API path, return its file system path.
65 """Given an API path, return its file system path.
66
66
67 Parameters
67 Parameters
68 ----------
68 ----------
69 path : string
69 path : string
70 The relative API path to the named file.
70 The relative API path to the named file.
71
71
72 Returns
72 Returns
73 -------
73 -------
74 path : string
74 path : string
75 Native, absolute OS path to for a file.
75 Native, absolute OS path to for a file.
76 """
76 """
77 return to_os_path(path, self.root_dir)
77 return to_os_path(path, self.root_dir)
78
78
79 def dir_exists(self, path):
79 def dir_exists(self, path):
80 """Does the API-style path refer to an extant directory?
80 """Does the API-style path refer to an extant directory?
81
81
82 API-style wrapper for os.path.isdir
82 API-style wrapper for os.path.isdir
83
83
84 Parameters
84 Parameters
85 ----------
85 ----------
86 path : string
86 path : string
87 The path to check. This is an API path (`/` separated,
87 The path to check. This is an API path (`/` separated,
88 relative to root_dir).
88 relative to root_dir).
89
89
90 Returns
90 Returns
91 -------
91 -------
92 exists : bool
92 exists : bool
93 Whether the path is indeed a directory.
93 Whether the path is indeed a directory.
94 """
94 """
95 path = path.strip('/')
95 path = path.strip('/')
96 os_path = self._get_os_path(path=path)
96 os_path = self._get_os_path(path=path)
97 return os.path.isdir(os_path)
97 return os.path.isdir(os_path)
98
98
99 def is_hidden(self, path):
99 def is_hidden(self, path):
100 """Does the API style path correspond to a hidden directory or file?
100 """Does the API style path correspond to a hidden directory or file?
101
101
102 Parameters
102 Parameters
103 ----------
103 ----------
104 path : string
104 path : string
105 The path to check. This is an API path (`/` separated,
105 The path to check. This is an API path (`/` separated,
106 relative to root_dir).
106 relative to root_dir).
107
107
108 Returns
108 Returns
109 -------
109 -------
110 hidden : bool
110 hidden : bool
111 Whether the path exists and is hidden.
111 Whether the path exists and is hidden.
112 """
112 """
113 path = path.strip('/')
113 path = path.strip('/')
114 os_path = self._get_os_path(path=path)
114 os_path = self._get_os_path(path=path)
115 return is_hidden(os_path, self.root_dir)
115 return is_hidden(os_path, self.root_dir)
116
116
117 def file_exists(self, path):
117 def file_exists(self, path):
118 """Returns True if the file exists, else returns False.
118 """Returns True if the file exists, else returns False.
119
119
120 API-style wrapper for os.path.isfile
120 API-style wrapper for os.path.isfile
121
121
122 Parameters
122 Parameters
123 ----------
123 ----------
124 path : string
124 path : string
125 The relative path to the file (with '/' as separator)
125 The relative path to the file (with '/' as separator)
126
126
127 Returns
127 Returns
128 -------
128 -------
129 exists : bool
129 exists : bool
130 Whether the file exists.
130 Whether the file exists.
131 """
131 """
132 path = path.strip('/')
132 path = path.strip('/')
133 os_path = self._get_os_path(path)
133 os_path = self._get_os_path(path)
134 return os.path.isfile(os_path)
134 return os.path.isfile(os_path)
135
135
136 def exists(self, path):
136 def exists(self, path):
137 """Returns True if the path exists, else returns False.
137 """Returns True if the path exists, else returns False.
138
138
139 API-style wrapper for os.path.exists
139 API-style wrapper for os.path.exists
140
140
141 Parameters
141 Parameters
142 ----------
142 ----------
143 path : string
143 path : string
144 The API path to the file (with '/' as separator)
144 The API path to the file (with '/' as separator)
145
145
146 Returns
146 Returns
147 -------
147 -------
148 exists : bool
148 exists : bool
149 Whether the target exists.
149 Whether the target exists.
150 """
150 """
151 path = path.strip('/')
151 path = path.strip('/')
152 os_path = self._get_os_path(path=path)
152 os_path = self._get_os_path(path=path)
153 return os.path.exists(os_path)
153 return os.path.exists(os_path)
154
154
155 def _base_model(self, path):
155 def _base_model(self, path):
156 """Build the common base of a contents model"""
156 """Build the common base of a contents model"""
157 os_path = self._get_os_path(path)
157 os_path = self._get_os_path(path)
158 info = os.stat(os_path)
158 info = os.stat(os_path)
159 last_modified = tz.utcfromtimestamp(info.st_mtime)
159 last_modified = tz.utcfromtimestamp(info.st_mtime)
160 created = tz.utcfromtimestamp(info.st_ctime)
160 created = tz.utcfromtimestamp(info.st_ctime)
161 # Create the base model.
161 # Create the base model.
162 model = {}
162 model = {}
163 model['name'] = path.rsplit('/', 1)[-1]
163 model['name'] = path.rsplit('/', 1)[-1]
164 model['path'] = path
164 model['path'] = path
165 model['last_modified'] = last_modified
165 model['last_modified'] = last_modified
166 model['created'] = created
166 model['created'] = created
167 model['content'] = None
167 model['content'] = None
168 model['format'] = None
168 model['format'] = None
169 return model
169 return model
170
170
171 def _dir_model(self, path, content=True):
171 def _dir_model(self, path, content=True):
172 """Build a model for a directory
172 """Build a model for a directory
173
173
174 if content is requested, will include a listing of the directory
174 if content is requested, will include a listing of the directory
175 """
175 """
176 os_path = self._get_os_path(path)
176 os_path = self._get_os_path(path)
177
177
178 four_o_four = u'directory does not exist: %r' % os_path
178 four_o_four = u'directory does not exist: %r' % os_path
179
179
180 if not os.path.isdir(os_path):
180 if not os.path.isdir(os_path):
181 raise web.HTTPError(404, four_o_four)
181 raise web.HTTPError(404, four_o_four)
182 elif is_hidden(os_path, self.root_dir):
182 elif is_hidden(os_path, self.root_dir):
183 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
183 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
184 os_path
184 os_path
185 )
185 )
186 raise web.HTTPError(404, four_o_four)
186 raise web.HTTPError(404, four_o_four)
187
187
188 model = self._base_model(path)
188 model = self._base_model(path)
189 model['type'] = 'directory'
189 model['type'] = 'directory'
190 if content:
190 if content:
191 model['content'] = contents = []
191 model['content'] = contents = []
192 os_dir = self._get_os_path(path)
192 os_dir = self._get_os_path(path)
193 for name in os.listdir(os_dir):
193 for name in os.listdir(os_dir):
194 os_path = os.path.join(os_dir, name)
194 os_path = os.path.join(os_dir, name)
195 # skip over broken symlinks in listing
195 # skip over broken symlinks in listing
196 if not os.path.exists(os_path):
196 if not os.path.exists(os_path):
197 self.log.warn("%s doesn't exist", os_path)
197 self.log.warn("%s doesn't exist", os_path)
198 continue
198 continue
199 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
199 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
200 self.log.debug("%s not a regular file", os_path)
200 self.log.debug("%s not a regular file", os_path)
201 continue
201 continue
202 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
202 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
203 contents.append(self.get_model(
203 contents.append(self.get_model(
204 path='%s/%s' % (path, name),
204 path='%s/%s' % (path, name),
205 content=False)
205 content=False)
206 )
206 )
207
207
208 model['format'] = 'json'
208 model['format'] = 'json'
209
209
210 return model
210 return model
211
211
212 def _file_model(self, path, content=True):
212 def _file_model(self, path, content=True):
213 """Build a model for a file
213 """Build a model for a file
214
214
215 if content is requested, include the file contents.
215 if content is requested, include the file contents.
216 UTF-8 text files will be unicode, binary files will be base64-encoded.
216 UTF-8 text files will be unicode, binary files will be base64-encoded.
217 """
217 """
218 model = self._base_model(path)
218 model = self._base_model(path)
219 model['type'] = 'file'
219 model['type'] = 'file'
220 if content:
220 if content:
221 os_path = self._get_os_path(path)
221 os_path = self._get_os_path(path)
222 if not os.path.isfile(os_path):
222 if not os.path.isfile(os_path):
223 # could be FIFO
223 # could be FIFO
224 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
224 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
225 with io.open(os_path, 'rb') as f:
225 with io.open(os_path, 'rb') as f:
226 bcontent = f.read()
226 bcontent = f.read()
227 try:
227 try:
228 model['content'] = bcontent.decode('utf8')
228 model['content'] = bcontent.decode('utf8')
229 except UnicodeError as e:
229 except UnicodeError as e:
230 model['content'] = base64.encodestring(bcontent).decode('ascii')
230 model['content'] = base64.encodestring(bcontent).decode('ascii')
231 model['format'] = 'base64'
231 model['format'] = 'base64'
232 else:
232 else:
233 model['format'] = 'text'
233 model['format'] = 'text'
234 return model
234 return model
235
235
236
236
237 def _notebook_model(self, path, content=True):
237 def _notebook_model(self, path, content=True):
238 """Build a notebook model
238 """Build a notebook model
239
239
240 if content is requested, the notebook content will be populated
240 if content is requested, the notebook content will be populated
241 as a JSON structure (not double-serialized)
241 as a JSON structure (not double-serialized)
242 """
242 """
243 model = self._base_model(path)
243 model = self._base_model(path)
244 model['type'] = 'notebook'
244 model['type'] = 'notebook'
245 if content:
245 if content:
246 os_path = self._get_os_path(path)
246 os_path = self._get_os_path(path)
247 with io.open(os_path, 'r', encoding='utf-8') as f:
247 with io.open(os_path, 'r', encoding='utf-8') as f:
248 try:
248 try:
249 nb = nbformat.read(f, as_version=4)
249 nb = nbformat.read(f, as_version=4)
250 except Exception as e:
250 except Exception as e:
251 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
251 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
252 self.mark_trusted_cells(nb, path)
252 self.mark_trusted_cells(nb, path)
253 model['content'] = nb
253 model['content'] = nb
254 model['format'] = 'json'
254 model['format'] = 'json'
255 self.validate_notebook_model(model)
255 self.validate_notebook_model(model)
256 return model
256 return model
257
257
258 def get_model(self, path, content=True):
258 def get_model(self, path, content=True, type_=None):
259 """ Takes a path for an entity and returns its model
259 """ Takes a path for an entity and returns its model
260
260
261 Parameters
261 Parameters
262 ----------
262 ----------
263 path : str
263 path : str
264 the API path that describes the relative path for the target
264 the API path that describes the relative path for the target
265 content : bool
266 Whether to include the contents in the reply
267 type_ : str, optional
268 The requested type - 'file', 'notebook', or 'directory'.
269 Will raise HTTPError 406 if the content doesn't match.
265
270
266 Returns
271 Returns
267 -------
272 -------
268 model : dict
273 model : dict
269 the contents model. If content=True, returns the contents
274 the contents model. If content=True, returns the contents
270 of the file or directory as well.
275 of the file or directory as well.
271 """
276 """
272 path = path.strip('/')
277 path = path.strip('/')
273
278
274 if not self.exists(path):
279 if not self.exists(path):
275 raise web.HTTPError(404, u'No such file or directory: %s' % path)
280 raise web.HTTPError(404, u'No such file or directory: %s' % path)
276
281
277 os_path = self._get_os_path(path)
282 os_path = self._get_os_path(path)
278 if os.path.isdir(os_path):
283 if os.path.isdir(os_path):
284 if type_ not in (None, 'directory'):
285 raise web.HTTPError(400,
286 u'%s is a directory, not a %s' % (path, type_))
279 model = self._dir_model(path, content=content)
287 model = self._dir_model(path, content=content)
280 elif path.endswith('.ipynb'):
288 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
281 model = self._notebook_model(path, content=content)
289 model = self._notebook_model(path, content=content)
282 else:
290 else:
291 if type_ == 'directory':
292 raise web.HTTPError(400,
293 u'%s is not a directory')
283 model = self._file_model(path, content=content)
294 model = self._file_model(path, content=content)
284 return model
295 return model
285
296
286 def _save_notebook(self, os_path, model, path=''):
297 def _save_notebook(self, os_path, model, path=''):
287 """save a notebook file"""
298 """save a notebook file"""
288 # Save the notebook file
299 # Save the notebook file
289 nb = nbformat.from_dict(model['content'])
300 nb = nbformat.from_dict(model['content'])
290
301
291 self.check_and_sign(nb, path)
302 self.check_and_sign(nb, path)
292
303
293 with atomic_writing(os_path, encoding='utf-8') as f:
304 with atomic_writing(os_path, encoding='utf-8') as f:
294 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
305 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
295
306
296 def _save_file(self, os_path, model, path=''):
307 def _save_file(self, os_path, model, path=''):
297 """save a non-notebook file"""
308 """save a non-notebook file"""
298 fmt = model.get('format', None)
309 fmt = model.get('format', None)
299 if fmt not in {'text', 'base64'}:
310 if fmt not in {'text', 'base64'}:
300 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
311 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
301 try:
312 try:
302 content = model['content']
313 content = model['content']
303 if fmt == 'text':
314 if fmt == 'text':
304 bcontent = content.encode('utf8')
315 bcontent = content.encode('utf8')
305 else:
316 else:
306 b64_bytes = content.encode('ascii')
317 b64_bytes = content.encode('ascii')
307 bcontent = base64.decodestring(b64_bytes)
318 bcontent = base64.decodestring(b64_bytes)
308 except Exception as e:
319 except Exception as e:
309 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
320 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
310 with atomic_writing(os_path, text=False) as f:
321 with atomic_writing(os_path, text=False) as f:
311 f.write(bcontent)
322 f.write(bcontent)
312
323
313 def _save_directory(self, os_path, model, path=''):
324 def _save_directory(self, os_path, model, path=''):
314 """create a directory"""
325 """create a directory"""
315 if is_hidden(os_path, self.root_dir):
326 if is_hidden(os_path, self.root_dir):
316 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
327 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
317 if not os.path.exists(os_path):
328 if not os.path.exists(os_path):
318 os.mkdir(os_path)
329 os.mkdir(os_path)
319 elif not os.path.isdir(os_path):
330 elif not os.path.isdir(os_path):
320 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
331 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
321 else:
332 else:
322 self.log.debug("Directory %r already exists", os_path)
333 self.log.debug("Directory %r already exists", os_path)
323
334
324 def save(self, model, path=''):
335 def save(self, model, path=''):
325 """Save the file model and return the model with no content."""
336 """Save the file model and return the model with no content."""
326 path = path.strip('/')
337 path = path.strip('/')
327
338
328 if 'type' not in model:
339 if 'type' not in model:
329 raise web.HTTPError(400, u'No file type provided')
340 raise web.HTTPError(400, u'No file type provided')
330 if 'content' not in model and model['type'] != 'directory':
341 if 'content' not in model and model['type'] != 'directory':
331 raise web.HTTPError(400, u'No file content provided')
342 raise web.HTTPError(400, u'No file content provided')
332
343
333 # One checkpoint should always exist
344 # One checkpoint should always exist
334 if self.file_exists(path) and not self.list_checkpoints(path):
345 if self.file_exists(path) and not self.list_checkpoints(path):
335 self.create_checkpoint(path)
346 self.create_checkpoint(path)
336
347
337 os_path = self._get_os_path(path)
348 os_path = self._get_os_path(path)
338 self.log.debug("Saving %s", os_path)
349 self.log.debug("Saving %s", os_path)
339 try:
350 try:
340 if model['type'] == 'notebook':
351 if model['type'] == 'notebook':
341 self._save_notebook(os_path, model, path)
352 self._save_notebook(os_path, model, path)
342 elif model['type'] == 'file':
353 elif model['type'] == 'file':
343 self._save_file(os_path, model, path)
354 self._save_file(os_path, model, path)
344 elif model['type'] == 'directory':
355 elif model['type'] == 'directory':
345 self._save_directory(os_path, model, path)
356 self._save_directory(os_path, model, path)
346 else:
357 else:
347 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
358 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
348 except web.HTTPError:
359 except web.HTTPError:
349 raise
360 raise
350 except Exception as e:
361 except Exception as e:
351 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
362 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
352
363
353 validation_message = None
364 validation_message = None
354 if model['type'] == 'notebook':
365 if model['type'] == 'notebook':
355 self.validate_notebook_model(model)
366 self.validate_notebook_model(model)
356 validation_message = model.get('message', None)
367 validation_message = model.get('message', None)
357
368
358 model = self.get_model(path, content=False)
369 model = self.get_model(path, content=False)
359 if validation_message:
370 if validation_message:
360 model['message'] = validation_message
371 model['message'] = validation_message
361 return model
372 return model
362
373
363 def update(self, model, path):
374 def update(self, model, path):
364 """Update the file's path
375 """Update the file's path
365
376
366 For use in PATCH requests, to enable renaming a file without
377 For use in PATCH requests, to enable renaming a file without
367 re-uploading its contents. Only used for renaming at the moment.
378 re-uploading its contents. Only used for renaming at the moment.
368 """
379 """
369 path = path.strip('/')
380 path = path.strip('/')
370 new_path = model.get('path', path).strip('/')
381 new_path = model.get('path', path).strip('/')
371 if path != new_path:
382 if path != new_path:
372 self.rename(path, new_path)
383 self.rename(path, new_path)
373 model = self.get_model(new_path, content=False)
384 model = self.get_model(new_path, content=False)
374 return model
385 return model
375
386
376 def delete(self, path):
387 def delete(self, path):
377 """Delete file at path."""
388 """Delete file at path."""
378 path = path.strip('/')
389 path = path.strip('/')
379 os_path = self._get_os_path(path)
390 os_path = self._get_os_path(path)
380 rm = os.unlink
391 rm = os.unlink
381 if os.path.isdir(os_path):
392 if os.path.isdir(os_path):
382 listing = os.listdir(os_path)
393 listing = os.listdir(os_path)
383 # don't delete non-empty directories (checkpoints dir doesn't count)
394 # don't delete non-empty directories (checkpoints dir doesn't count)
384 if listing and listing != [self.checkpoint_dir]:
395 if listing and listing != [self.checkpoint_dir]:
385 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
396 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
386 elif not os.path.isfile(os_path):
397 elif not os.path.isfile(os_path):
387 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
398 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
388
399
389 # clear checkpoints
400 # clear checkpoints
390 for checkpoint in self.list_checkpoints(path):
401 for checkpoint in self.list_checkpoints(path):
391 checkpoint_id = checkpoint['id']
402 checkpoint_id = checkpoint['id']
392 cp_path = self.get_checkpoint_path(checkpoint_id, path)
403 cp_path = self.get_checkpoint_path(checkpoint_id, path)
393 if os.path.isfile(cp_path):
404 if os.path.isfile(cp_path):
394 self.log.debug("Unlinking checkpoint %s", cp_path)
405 self.log.debug("Unlinking checkpoint %s", cp_path)
395 os.unlink(cp_path)
406 os.unlink(cp_path)
396
407
397 if os.path.isdir(os_path):
408 if os.path.isdir(os_path):
398 self.log.debug("Removing directory %s", os_path)
409 self.log.debug("Removing directory %s", os_path)
399 shutil.rmtree(os_path)
410 shutil.rmtree(os_path)
400 else:
411 else:
401 self.log.debug("Unlinking file %s", os_path)
412 self.log.debug("Unlinking file %s", os_path)
402 rm(os_path)
413 rm(os_path)
403
414
404 def rename(self, old_path, new_path):
415 def rename(self, old_path, new_path):
405 """Rename a file."""
416 """Rename a file."""
406 old_path = old_path.strip('/')
417 old_path = old_path.strip('/')
407 new_path = new_path.strip('/')
418 new_path = new_path.strip('/')
408 if new_path == old_path:
419 if new_path == old_path:
409 return
420 return
410
421
411 new_os_path = self._get_os_path(new_path)
422 new_os_path = self._get_os_path(new_path)
412 old_os_path = self._get_os_path(old_path)
423 old_os_path = self._get_os_path(old_path)
413
424
414 # Should we proceed with the move?
425 # Should we proceed with the move?
415 if os.path.exists(new_os_path):
426 if os.path.exists(new_os_path):
416 raise web.HTTPError(409, u'File already exists: %s' % new_path)
427 raise web.HTTPError(409, u'File already exists: %s' % new_path)
417
428
418 # Move the file
429 # Move the file
419 try:
430 try:
420 shutil.move(old_os_path, new_os_path)
431 shutil.move(old_os_path, new_os_path)
421 except Exception as e:
432 except Exception as e:
422 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
433 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
423
434
424 # Move the checkpoints
435 # Move the checkpoints
425 old_checkpoints = self.list_checkpoints(old_path)
436 old_checkpoints = self.list_checkpoints(old_path)
426 for cp in old_checkpoints:
437 for cp in old_checkpoints:
427 checkpoint_id = cp['id']
438 checkpoint_id = cp['id']
428 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
439 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
429 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
440 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
430 if os.path.isfile(old_cp_path):
441 if os.path.isfile(old_cp_path):
431 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
442 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
432 shutil.move(old_cp_path, new_cp_path)
443 shutil.move(old_cp_path, new_cp_path)
433
444
434 # Checkpoint-related utilities
445 # Checkpoint-related utilities
435
446
436 def get_checkpoint_path(self, checkpoint_id, path):
447 def get_checkpoint_path(self, checkpoint_id, path):
437 """find the path to a checkpoint"""
448 """find the path to a checkpoint"""
438 path = path.strip('/')
449 path = path.strip('/')
439 parent, name = ('/' + path).rsplit('/', 1)
450 parent, name = ('/' + path).rsplit('/', 1)
440 parent = parent.strip('/')
451 parent = parent.strip('/')
441 basename, ext = os.path.splitext(name)
452 basename, ext = os.path.splitext(name)
442 filename = u"{name}-{checkpoint_id}{ext}".format(
453 filename = u"{name}-{checkpoint_id}{ext}".format(
443 name=basename,
454 name=basename,
444 checkpoint_id=checkpoint_id,
455 checkpoint_id=checkpoint_id,
445 ext=ext,
456 ext=ext,
446 )
457 )
447 os_path = self._get_os_path(path=parent)
458 os_path = self._get_os_path(path=parent)
448 cp_dir = os.path.join(os_path, self.checkpoint_dir)
459 cp_dir = os.path.join(os_path, self.checkpoint_dir)
449 ensure_dir_exists(cp_dir)
460 ensure_dir_exists(cp_dir)
450 cp_path = os.path.join(cp_dir, filename)
461 cp_path = os.path.join(cp_dir, filename)
451 return cp_path
462 return cp_path
452
463
453 def get_checkpoint_model(self, checkpoint_id, path):
464 def get_checkpoint_model(self, checkpoint_id, path):
454 """construct the info dict for a given checkpoint"""
465 """construct the info dict for a given checkpoint"""
455 path = path.strip('/')
466 path = path.strip('/')
456 cp_path = self.get_checkpoint_path(checkpoint_id, path)
467 cp_path = self.get_checkpoint_path(checkpoint_id, path)
457 stats = os.stat(cp_path)
468 stats = os.stat(cp_path)
458 last_modified = tz.utcfromtimestamp(stats.st_mtime)
469 last_modified = tz.utcfromtimestamp(stats.st_mtime)
459 info = dict(
470 info = dict(
460 id = checkpoint_id,
471 id = checkpoint_id,
461 last_modified = last_modified,
472 last_modified = last_modified,
462 )
473 )
463 return info
474 return info
464
475
465 # public checkpoint API
476 # public checkpoint API
466
477
467 def create_checkpoint(self, path):
478 def create_checkpoint(self, path):
468 """Create a checkpoint from the current state of a file"""
479 """Create a checkpoint from the current state of a file"""
469 path = path.strip('/')
480 path = path.strip('/')
470 if not self.file_exists(path):
481 if not self.file_exists(path):
471 raise web.HTTPError(404)
482 raise web.HTTPError(404)
472 src_path = self._get_os_path(path)
483 src_path = self._get_os_path(path)
473 # only the one checkpoint ID:
484 # only the one checkpoint ID:
474 checkpoint_id = u"checkpoint"
485 checkpoint_id = u"checkpoint"
475 cp_path = self.get_checkpoint_path(checkpoint_id, path)
486 cp_path = self.get_checkpoint_path(checkpoint_id, path)
476 self.log.debug("creating checkpoint for %s", path)
487 self.log.debug("creating checkpoint for %s", path)
477 self._copy(src_path, cp_path)
488 self._copy(src_path, cp_path)
478
489
479 # return the checkpoint info
490 # return the checkpoint info
480 return self.get_checkpoint_model(checkpoint_id, path)
491 return self.get_checkpoint_model(checkpoint_id, path)
481
492
482 def list_checkpoints(self, path):
493 def list_checkpoints(self, path):
483 """list the checkpoints for a given file
494 """list the checkpoints for a given file
484
495
485 This contents manager currently only supports one checkpoint per file.
496 This contents manager currently only supports one checkpoint per file.
486 """
497 """
487 path = path.strip('/')
498 path = path.strip('/')
488 checkpoint_id = "checkpoint"
499 checkpoint_id = "checkpoint"
489 os_path = self.get_checkpoint_path(checkpoint_id, path)
500 os_path = self.get_checkpoint_path(checkpoint_id, path)
490 if not os.path.exists(os_path):
501 if not os.path.exists(os_path):
491 return []
502 return []
492 else:
503 else:
493 return [self.get_checkpoint_model(checkpoint_id, path)]
504 return [self.get_checkpoint_model(checkpoint_id, path)]
494
505
495
506
496 def restore_checkpoint(self, checkpoint_id, path):
507 def restore_checkpoint(self, checkpoint_id, path):
497 """restore a file to a checkpointed state"""
508 """restore a file to a checkpointed state"""
498 path = path.strip('/')
509 path = path.strip('/')
499 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
510 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
500 nb_path = self._get_os_path(path)
511 nb_path = self._get_os_path(path)
501 cp_path = self.get_checkpoint_path(checkpoint_id, path)
512 cp_path = self.get_checkpoint_path(checkpoint_id, path)
502 if not os.path.isfile(cp_path):
513 if not os.path.isfile(cp_path):
503 self.log.debug("checkpoint file does not exist: %s", cp_path)
514 self.log.debug("checkpoint file does not exist: %s", cp_path)
504 raise web.HTTPError(404,
515 raise web.HTTPError(404,
505 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
516 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
506 )
517 )
507 # ensure notebook is readable (never restore from an unreadable notebook)
518 # ensure notebook is readable (never restore from an unreadable notebook)
508 if cp_path.endswith('.ipynb'):
519 if cp_path.endswith('.ipynb'):
509 with io.open(cp_path, 'r', encoding='utf-8') as f:
520 with io.open(cp_path, 'r', encoding='utf-8') as f:
510 nbformat.read(f, as_version=4)
521 nbformat.read(f, as_version=4)
511 self._copy(cp_path, nb_path)
522 self._copy(cp_path, nb_path)
512 self.log.debug("copying %s -> %s", cp_path, nb_path)
523 self.log.debug("copying %s -> %s", cp_path, nb_path)
513
524
514 def delete_checkpoint(self, checkpoint_id, path):
525 def delete_checkpoint(self, checkpoint_id, path):
515 """delete a file's checkpoint"""
526 """delete a file's checkpoint"""
516 path = path.strip('/')
527 path = path.strip('/')
517 cp_path = self.get_checkpoint_path(checkpoint_id, path)
528 cp_path = self.get_checkpoint_path(checkpoint_id, path)
518 if not os.path.isfile(cp_path):
529 if not os.path.isfile(cp_path):
519 raise web.HTTPError(404,
530 raise web.HTTPError(404,
520 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
531 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
521 )
532 )
522 self.log.debug("unlinking %s", cp_path)
533 self.log.debug("unlinking %s", cp_path)
523 os.unlink(cp_path)
534 os.unlink(cp_path)
524
535
525 def info_string(self):
536 def info_string(self):
526 return "Serving notebooks from local directory: %s" % self.root_dir
537 return "Serving notebooks from local directory: %s" % self.root_dir
527
538
528 def get_kernel_path(self, path, model=None):
539 def get_kernel_path(self, path, model=None):
529 """Return the initial working dir a kernel associated with a given notebook"""
540 """Return the initial working dir a kernel associated with a given notebook"""
530 if '/' in path:
541 if '/' in path:
531 parent_dir = path.rsplit('/', 1)[0]
542 parent_dir = path.rsplit('/', 1)[0]
532 else:
543 else:
533 parent_dir = ''
544 parent_dir = ''
534 return self._get_os_path(parent_dir)
545 return self._get_os_path(parent_dir)
@@ -1,257 +1,261 b''
1 """Tornado handlers for the contents web service."""
1 """Tornado handlers for the contents web service."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7
7
8 from tornado import web
8 from tornado import web
9
9
10 from IPython.html.utils import url_path_join, url_escape
10 from IPython.html.utils import url_path_join, url_escape
11 from IPython.utils.jsonutil import date_default
11 from IPython.utils.jsonutil import date_default
12
12
13 from IPython.html.base.handlers import (
13 from IPython.html.base.handlers import (
14 IPythonHandler, json_errors, path_regex,
14 IPythonHandler, json_errors, path_regex,
15 )
15 )
16
16
17
17
18 def sort_key(model):
18 def sort_key(model):
19 """key function for case-insensitive sort by name and type"""
19 """key function for case-insensitive sort by name and type"""
20 iname = model['name'].lower()
20 iname = model['name'].lower()
21 type_key = {
21 type_key = {
22 'directory' : '0',
22 'directory' : '0',
23 'notebook' : '1',
23 'notebook' : '1',
24 'file' : '2',
24 'file' : '2',
25 }.get(model['type'], '9')
25 }.get(model['type'], '9')
26 return u'%s%s' % (type_key, iname)
26 return u'%s%s' % (type_key, iname)
27
27
28 class ContentsHandler(IPythonHandler):
28 class ContentsHandler(IPythonHandler):
29
29
30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
31
31
32 def location_url(self, path):
32 def location_url(self, path):
33 """Return the full URL location of a file.
33 """Return the full URL location of a file.
34
34
35 Parameters
35 Parameters
36 ----------
36 ----------
37 path : unicode
37 path : unicode
38 The API path of the file, such as "foo/bar.txt".
38 The API path of the file, such as "foo/bar.txt".
39 """
39 """
40 return url_escape(url_path_join(
40 return url_escape(url_path_join(
41 self.base_url, 'api', 'contents', path
41 self.base_url, 'api', 'contents', path
42 ))
42 ))
43
43
44 def _finish_model(self, model, location=True):
44 def _finish_model(self, model, location=True):
45 """Finish a JSON request with a model, setting relevant headers, etc."""
45 """Finish a JSON request with a model, setting relevant headers, etc."""
46 if location:
46 if location:
47 location = self.location_url(model['path'])
47 location = self.location_url(model['path'])
48 self.set_header('Location', location)
48 self.set_header('Location', location)
49 self.set_header('Last-Modified', model['last_modified'])
49 self.set_header('Last-Modified', model['last_modified'])
50 self.finish(json.dumps(model, default=date_default))
50 self.finish(json.dumps(model, default=date_default))
51
51
52 @web.authenticated
52 @web.authenticated
53 @json_errors
53 @json_errors
54 def get(self, path=''):
54 def get(self, path=''):
55 """Return a model for a file or directory.
55 """Return a model for a file or directory.
56
56
57 A directory model contains a list of models (without content)
57 A directory model contains a list of models (without content)
58 of the files and directories it contains.
58 of the files and directories it contains.
59 """
59 """
60 path = path or ''
60 path = path or ''
61 model = self.contents_manager.get_model(path=path)
61 type_ = self.get_query_argument('type', default=None)
62 if type_ not in {None, 'directory', 'file', 'notebook'}:
63 raise web.HTTPError(400, u'Type %r is invalid' % type_)
64
65 model = self.contents_manager.get_model(path=path, type_=type_)
62 if model['type'] == 'directory':
66 if model['type'] == 'directory':
63 # group listing by type, then by name (case-insensitive)
67 # group listing by type, then by name (case-insensitive)
64 # FIXME: sorting should be done in the frontends
68 # FIXME: sorting should be done in the frontends
65 model['content'].sort(key=sort_key)
69 model['content'].sort(key=sort_key)
66 self._finish_model(model, location=False)
70 self._finish_model(model, location=False)
67
71
68 @web.authenticated
72 @web.authenticated
69 @json_errors
73 @json_errors
70 def patch(self, path=''):
74 def patch(self, path=''):
71 """PATCH renames a file or directory without re-uploading content."""
75 """PATCH renames a file or directory without re-uploading content."""
72 cm = self.contents_manager
76 cm = self.contents_manager
73 model = self.get_json_body()
77 model = self.get_json_body()
74 if model is None:
78 if model is None:
75 raise web.HTTPError(400, u'JSON body missing')
79 raise web.HTTPError(400, u'JSON body missing')
76 model = cm.update(model, path)
80 model = cm.update(model, path)
77 self._finish_model(model)
81 self._finish_model(model)
78
82
79 def _copy(self, copy_from, copy_to=None):
83 def _copy(self, copy_from, copy_to=None):
80 """Copy a file, optionally specifying a target directory."""
84 """Copy a file, optionally specifying a target directory."""
81 self.log.info(u"Copying {copy_from} to {copy_to}".format(
85 self.log.info(u"Copying {copy_from} to {copy_to}".format(
82 copy_from=copy_from,
86 copy_from=copy_from,
83 copy_to=copy_to or '',
87 copy_to=copy_to or '',
84 ))
88 ))
85 model = self.contents_manager.copy(copy_from, copy_to)
89 model = self.contents_manager.copy(copy_from, copy_to)
86 self.set_status(201)
90 self.set_status(201)
87 self._finish_model(model)
91 self._finish_model(model)
88
92
89 def _upload(self, model, path):
93 def _upload(self, model, path):
90 """Handle upload of a new file to path"""
94 """Handle upload of a new file to path"""
91 self.log.info(u"Uploading file to %s", path)
95 self.log.info(u"Uploading file to %s", path)
92 model = self.contents_manager.new(model, path)
96 model = self.contents_manager.new(model, path)
93 self.set_status(201)
97 self.set_status(201)
94 self._finish_model(model)
98 self._finish_model(model)
95
99
96 def _new_untitled(self, path, type='', ext=''):
100 def _new_untitled(self, path, type='', ext=''):
97 """Create a new, empty untitled entity"""
101 """Create a new, empty untitled entity"""
98 self.log.info(u"Creating new %s in %s", type or 'file', path)
102 self.log.info(u"Creating new %s in %s", type or 'file', path)
99 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
103 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
100 self.set_status(201)
104 self.set_status(201)
101 self._finish_model(model)
105 self._finish_model(model)
102
106
103 def _save(self, model, path):
107 def _save(self, model, path):
104 """Save an existing file."""
108 """Save an existing file."""
105 self.log.info(u"Saving file at %s", path)
109 self.log.info(u"Saving file at %s", path)
106 model = self.contents_manager.save(model, path)
110 model = self.contents_manager.save(model, path)
107 self._finish_model(model)
111 self._finish_model(model)
108
112
109 @web.authenticated
113 @web.authenticated
110 @json_errors
114 @json_errors
111 def post(self, path=''):
115 def post(self, path=''):
112 """Create a new file in the specified path.
116 """Create a new file in the specified path.
113
117
114 POST creates new files. The server always decides on the name.
118 POST creates new files. The server always decides on the name.
115
119
116 POST /api/contents/path
120 POST /api/contents/path
117 New untitled, empty file or directory.
121 New untitled, empty file or directory.
118 POST /api/contents/path
122 POST /api/contents/path
119 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
123 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
120 New copy of OtherNotebook in path
124 New copy of OtherNotebook in path
121 """
125 """
122
126
123 cm = self.contents_manager
127 cm = self.contents_manager
124
128
125 if cm.file_exists(path):
129 if cm.file_exists(path):
126 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
130 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
127
131
128 if not cm.dir_exists(path):
132 if not cm.dir_exists(path):
129 raise web.HTTPError(404, "No such directory: %s" % path)
133 raise web.HTTPError(404, "No such directory: %s" % path)
130
134
131 model = self.get_json_body()
135 model = self.get_json_body()
132
136
133 if model is not None:
137 if model is not None:
134 copy_from = model.get('copy_from')
138 copy_from = model.get('copy_from')
135 ext = model.get('ext', '')
139 ext = model.get('ext', '')
136 type = model.get('type', '')
140 type = model.get('type', '')
137 if copy_from:
141 if copy_from:
138 self._copy(copy_from, path)
142 self._copy(copy_from, path)
139 else:
143 else:
140 self._new_untitled(path, type=type, ext=ext)
144 self._new_untitled(path, type=type, ext=ext)
141 else:
145 else:
142 self._new_untitled(path)
146 self._new_untitled(path)
143
147
144 @web.authenticated
148 @web.authenticated
145 @json_errors
149 @json_errors
146 def put(self, path=''):
150 def put(self, path=''):
147 """Saves the file in the location specified by name and path.
151 """Saves the file in the location specified by name and path.
148
152
149 PUT is very similar to POST, but the requester specifies the name,
153 PUT is very similar to POST, but the requester specifies the name,
150 whereas with POST, the server picks the name.
154 whereas with POST, the server picks the name.
151
155
152 PUT /api/contents/path/Name.ipynb
156 PUT /api/contents/path/Name.ipynb
153 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
157 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
154 in `content` key of JSON request body. If content is not specified,
158 in `content` key of JSON request body. If content is not specified,
155 create a new empty notebook.
159 create a new empty notebook.
156 """
160 """
157 model = self.get_json_body()
161 model = self.get_json_body()
158 if model:
162 if model:
159 if model.get('copy_from'):
163 if model.get('copy_from'):
160 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
164 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
161 if self.contents_manager.file_exists(path):
165 if self.contents_manager.file_exists(path):
162 self._save(model, path)
166 self._save(model, path)
163 else:
167 else:
164 self._upload(model, path)
168 self._upload(model, path)
165 else:
169 else:
166 self._new_untitled(path)
170 self._new_untitled(path)
167
171
168 @web.authenticated
172 @web.authenticated
169 @json_errors
173 @json_errors
170 def delete(self, path=''):
174 def delete(self, path=''):
171 """delete a file in the given path"""
175 """delete a file in the given path"""
172 cm = self.contents_manager
176 cm = self.contents_manager
173 self.log.warn('delete %s', path)
177 self.log.warn('delete %s', path)
174 cm.delete(path)
178 cm.delete(path)
175 self.set_status(204)
179 self.set_status(204)
176 self.finish()
180 self.finish()
177
181
178
182
179 class CheckpointsHandler(IPythonHandler):
183 class CheckpointsHandler(IPythonHandler):
180
184
181 SUPPORTED_METHODS = ('GET', 'POST')
185 SUPPORTED_METHODS = ('GET', 'POST')
182
186
183 @web.authenticated
187 @web.authenticated
184 @json_errors
188 @json_errors
185 def get(self, path=''):
189 def get(self, path=''):
186 """get lists checkpoints for a file"""
190 """get lists checkpoints for a file"""
187 cm = self.contents_manager
191 cm = self.contents_manager
188 checkpoints = cm.list_checkpoints(path)
192 checkpoints = cm.list_checkpoints(path)
189 data = json.dumps(checkpoints, default=date_default)
193 data = json.dumps(checkpoints, default=date_default)
190 self.finish(data)
194 self.finish(data)
191
195
192 @web.authenticated
196 @web.authenticated
193 @json_errors
197 @json_errors
194 def post(self, path=''):
198 def post(self, path=''):
195 """post creates a new checkpoint"""
199 """post creates a new checkpoint"""
196 cm = self.contents_manager
200 cm = self.contents_manager
197 checkpoint = cm.create_checkpoint(path)
201 checkpoint = cm.create_checkpoint(path)
198 data = json.dumps(checkpoint, default=date_default)
202 data = json.dumps(checkpoint, default=date_default)
199 location = url_path_join(self.base_url, 'api/contents',
203 location = url_path_join(self.base_url, 'api/contents',
200 path, 'checkpoints', checkpoint['id'])
204 path, 'checkpoints', checkpoint['id'])
201 self.set_header('Location', url_escape(location))
205 self.set_header('Location', url_escape(location))
202 self.set_status(201)
206 self.set_status(201)
203 self.finish(data)
207 self.finish(data)
204
208
205
209
206 class ModifyCheckpointsHandler(IPythonHandler):
210 class ModifyCheckpointsHandler(IPythonHandler):
207
211
208 SUPPORTED_METHODS = ('POST', 'DELETE')
212 SUPPORTED_METHODS = ('POST', 'DELETE')
209
213
210 @web.authenticated
214 @web.authenticated
211 @json_errors
215 @json_errors
212 def post(self, path, checkpoint_id):
216 def post(self, path, checkpoint_id):
213 """post restores a file from a checkpoint"""
217 """post restores a file from a checkpoint"""
214 cm = self.contents_manager
218 cm = self.contents_manager
215 cm.restore_checkpoint(checkpoint_id, path)
219 cm.restore_checkpoint(checkpoint_id, path)
216 self.set_status(204)
220 self.set_status(204)
217 self.finish()
221 self.finish()
218
222
219 @web.authenticated
223 @web.authenticated
220 @json_errors
224 @json_errors
221 def delete(self, path, checkpoint_id):
225 def delete(self, path, checkpoint_id):
222 """delete clears a checkpoint for a given file"""
226 """delete clears a checkpoint for a given file"""
223 cm = self.contents_manager
227 cm = self.contents_manager
224 cm.delete_checkpoint(checkpoint_id, path)
228 cm.delete_checkpoint(checkpoint_id, path)
225 self.set_status(204)
229 self.set_status(204)
226 self.finish()
230 self.finish()
227
231
228
232
229 class NotebooksRedirectHandler(IPythonHandler):
233 class NotebooksRedirectHandler(IPythonHandler):
230 """Redirect /api/notebooks to /api/contents"""
234 """Redirect /api/notebooks to /api/contents"""
231 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
235 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
232
236
233 def get(self, path):
237 def get(self, path):
234 self.log.warn("/api/notebooks is deprecated, use /api/contents")
238 self.log.warn("/api/notebooks is deprecated, use /api/contents")
235 self.redirect(url_path_join(
239 self.redirect(url_path_join(
236 self.base_url,
240 self.base_url,
237 'api/contents',
241 'api/contents',
238 path
242 path
239 ))
243 ))
240
244
241 put = patch = post = delete = get
245 put = patch = post = delete = get
242
246
243
247
244 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
245 # URL to handler mappings
249 # URL to handler mappings
246 #-----------------------------------------------------------------------------
250 #-----------------------------------------------------------------------------
247
251
248
252
249 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
253 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
250
254
251 default_handlers = [
255 default_handlers = [
252 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
256 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
253 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
257 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
254 ModifyCheckpointsHandler),
258 ModifyCheckpointsHandler),
255 (r"/api/contents%s" % path_regex, ContentsHandler),
259 (r"/api/contents%s" % path_regex, ContentsHandler),
256 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
260 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
257 ]
261 ]
@@ -1,373 +1,373 b''
1 """A base class for contents managers."""
1 """A base class for contents managers."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from fnmatch import fnmatch
6 from fnmatch import fnmatch
7 import itertools
7 import itertools
8 import json
8 import json
9 import os
9 import os
10
10
11 from tornado.web import HTTPError
11 from tornado.web import HTTPError
12
12
13 from IPython.config.configurable import LoggingConfigurable
13 from IPython.config.configurable import LoggingConfigurable
14 from IPython.nbformat import sign, validate, ValidationError
14 from IPython.nbformat import sign, validate, ValidationError
15 from IPython.nbformat.v4 import new_notebook
15 from IPython.nbformat.v4 import new_notebook
16 from IPython.utils.traitlets import Instance, Unicode, List
16 from IPython.utils.traitlets import Instance, Unicode, List
17
17
18
18
19 class ContentsManager(LoggingConfigurable):
19 class ContentsManager(LoggingConfigurable):
20 """Base class for serving files and directories.
20 """Base class for serving files and directories.
21
21
22 This serves any text or binary file,
22 This serves any text or binary file,
23 as well as directories,
23 as well as directories,
24 with special handling for JSON notebook documents.
24 with special handling for JSON notebook documents.
25
25
26 Most APIs take a path argument,
26 Most APIs take a path argument,
27 which is always an API-style unicode path,
27 which is always an API-style unicode path,
28 and always refers to a directory.
28 and always refers to a directory.
29
29
30 - unicode, not url-escaped
30 - unicode, not url-escaped
31 - '/'-separated
31 - '/'-separated
32 - leading and trailing '/' will be stripped
32 - leading and trailing '/' will be stripped
33 - if unspecified, path defaults to '',
33 - if unspecified, path defaults to '',
34 indicating the root path.
34 indicating the root path.
35
35
36 """
36 """
37
37
38 notary = Instance(sign.NotebookNotary)
38 notary = Instance(sign.NotebookNotary)
39 def _notary_default(self):
39 def _notary_default(self):
40 return sign.NotebookNotary(parent=self)
40 return sign.NotebookNotary(parent=self)
41
41
42 hide_globs = List(Unicode, [
42 hide_globs = List(Unicode, [
43 u'__pycache__', '*.pyc', '*.pyo',
43 u'__pycache__', '*.pyc', '*.pyo',
44 '.DS_Store', '*.so', '*.dylib', '*~',
44 '.DS_Store', '*.so', '*.dylib', '*~',
45 ], config=True, help="""
45 ], config=True, help="""
46 Glob patterns to hide in file and directory listings.
46 Glob patterns to hide in file and directory listings.
47 """)
47 """)
48
48
49 untitled_notebook = Unicode("Untitled", config=True,
49 untitled_notebook = Unicode("Untitled", config=True,
50 help="The base name used when creating untitled notebooks."
50 help="The base name used when creating untitled notebooks."
51 )
51 )
52
52
53 untitled_file = Unicode("untitled", config=True,
53 untitled_file = Unicode("untitled", config=True,
54 help="The base name used when creating untitled files."
54 help="The base name used when creating untitled files."
55 )
55 )
56
56
57 untitled_directory = Unicode("Untitled Folder", config=True,
57 untitled_directory = Unicode("Untitled Folder", config=True,
58 help="The base name used when creating untitled directories."
58 help="The base name used when creating untitled directories."
59 )
59 )
60
60
61 # ContentsManager API part 1: methods that must be
61 # ContentsManager API part 1: methods that must be
62 # implemented in subclasses.
62 # implemented in subclasses.
63
63
64 def dir_exists(self, path):
64 def dir_exists(self, path):
65 """Does the API-style path (directory) actually exist?
65 """Does the API-style path (directory) actually exist?
66
66
67 Like os.path.isdir
67 Like os.path.isdir
68
68
69 Override this method in subclasses.
69 Override this method in subclasses.
70
70
71 Parameters
71 Parameters
72 ----------
72 ----------
73 path : string
73 path : string
74 The path to check
74 The path to check
75
75
76 Returns
76 Returns
77 -------
77 -------
78 exists : bool
78 exists : bool
79 Whether the path does indeed exist.
79 Whether the path does indeed exist.
80 """
80 """
81 raise NotImplementedError
81 raise NotImplementedError
82
82
83 def is_hidden(self, path):
83 def is_hidden(self, path):
84 """Does the API style path correspond to a hidden directory or file?
84 """Does the API style path correspond to a hidden directory or file?
85
85
86 Parameters
86 Parameters
87 ----------
87 ----------
88 path : string
88 path : string
89 The path to check. This is an API path (`/` separated,
89 The path to check. This is an API path (`/` separated,
90 relative to root dir).
90 relative to root dir).
91
91
92 Returns
92 Returns
93 -------
93 -------
94 hidden : bool
94 hidden : bool
95 Whether the path is hidden.
95 Whether the path is hidden.
96
96
97 """
97 """
98 raise NotImplementedError
98 raise NotImplementedError
99
99
100 def file_exists(self, path=''):
100 def file_exists(self, path=''):
101 """Does a file exist at the given path?
101 """Does a file exist at the given path?
102
102
103 Like os.path.isfile
103 Like os.path.isfile
104
104
105 Override this method in subclasses.
105 Override this method in subclasses.
106
106
107 Parameters
107 Parameters
108 ----------
108 ----------
109 name : string
109 name : string
110 The name of the file you are checking.
110 The name of the file you are checking.
111 path : string
111 path : string
112 The relative path to the file's directory (with '/' as separator)
112 The relative path to the file's directory (with '/' as separator)
113
113
114 Returns
114 Returns
115 -------
115 -------
116 exists : bool
116 exists : bool
117 Whether the file exists.
117 Whether the file exists.
118 """
118 """
119 raise NotImplementedError('must be implemented in a subclass')
119 raise NotImplementedError('must be implemented in a subclass')
120
120
121 def exists(self, path):
121 def exists(self, path):
122 """Does a file or directory exist at the given path?
122 """Does a file or directory exist at the given path?
123
123
124 Like os.path.exists
124 Like os.path.exists
125
125
126 Parameters
126 Parameters
127 ----------
127 ----------
128 path : string
128 path : string
129 The relative path to the file's directory (with '/' as separator)
129 The relative path to the file's directory (with '/' as separator)
130
130
131 Returns
131 Returns
132 -------
132 -------
133 exists : bool
133 exists : bool
134 Whether the target exists.
134 Whether the target exists.
135 """
135 """
136 return self.file_exists(path) or self.dir_exists(path)
136 return self.file_exists(path) or self.dir_exists(path)
137
137
138 def get_model(self, path, content=True):
138 def get_model(self, path, content=True, type_=None):
139 """Get the model of a file or directory with or without content."""
139 """Get the model of a file or directory with or without content."""
140 raise NotImplementedError('must be implemented in a subclass')
140 raise NotImplementedError('must be implemented in a subclass')
141
141
142 def save(self, model, path):
142 def save(self, model, path):
143 """Save the file or directory and return the model with no content."""
143 """Save the file or directory and return the model with no content."""
144 raise NotImplementedError('must be implemented in a subclass')
144 raise NotImplementedError('must be implemented in a subclass')
145
145
146 def update(self, model, path):
146 def update(self, model, path):
147 """Update the file or directory and return the model with no content.
147 """Update the file or directory and return the model with no content.
148
148
149 For use in PATCH requests, to enable renaming a file without
149 For use in PATCH requests, to enable renaming a file without
150 re-uploading its contents. Only used for renaming at the moment.
150 re-uploading its contents. Only used for renaming at the moment.
151 """
151 """
152 raise NotImplementedError('must be implemented in a subclass')
152 raise NotImplementedError('must be implemented in a subclass')
153
153
154 def delete(self, path):
154 def delete(self, path):
155 """Delete file or directory by path."""
155 """Delete file or directory by path."""
156 raise NotImplementedError('must be implemented in a subclass')
156 raise NotImplementedError('must be implemented in a subclass')
157
157
158 def create_checkpoint(self, path):
158 def create_checkpoint(self, path):
159 """Create a checkpoint of the current state of a file
159 """Create a checkpoint of the current state of a file
160
160
161 Returns a checkpoint_id for the new checkpoint.
161 Returns a checkpoint_id for the new checkpoint.
162 """
162 """
163 raise NotImplementedError("must be implemented in a subclass")
163 raise NotImplementedError("must be implemented in a subclass")
164
164
165 def list_checkpoints(self, path):
165 def list_checkpoints(self, path):
166 """Return a list of checkpoints for a given file"""
166 """Return a list of checkpoints for a given file"""
167 return []
167 return []
168
168
169 def restore_checkpoint(self, checkpoint_id, path):
169 def restore_checkpoint(self, checkpoint_id, path):
170 """Restore a file from one of its checkpoints"""
170 """Restore a file from one of its checkpoints"""
171 raise NotImplementedError("must be implemented in a subclass")
171 raise NotImplementedError("must be implemented in a subclass")
172
172
173 def delete_checkpoint(self, checkpoint_id, path):
173 def delete_checkpoint(self, checkpoint_id, path):
174 """delete a checkpoint for a file"""
174 """delete a checkpoint for a file"""
175 raise NotImplementedError("must be implemented in a subclass")
175 raise NotImplementedError("must be implemented in a subclass")
176
176
177 # ContentsManager API part 2: methods that have useable default
177 # ContentsManager API part 2: methods that have useable default
178 # implementations, but can be overridden in subclasses.
178 # implementations, but can be overridden in subclasses.
179
179
180 def info_string(self):
180 def info_string(self):
181 return "Serving contents"
181 return "Serving contents"
182
182
183 def get_kernel_path(self, path, model=None):
183 def get_kernel_path(self, path, model=None):
184 """Return the API path for the kernel
184 """Return the API path for the kernel
185
185
186 KernelManagers can turn this value into a filesystem path,
186 KernelManagers can turn this value into a filesystem path,
187 or ignore it altogether.
187 or ignore it altogether.
188 """
188 """
189 return path
189 return path
190
190
191 def increment_filename(self, filename, path=''):
191 def increment_filename(self, filename, path=''):
192 """Increment a filename until it is unique.
192 """Increment a filename until it is unique.
193
193
194 Parameters
194 Parameters
195 ----------
195 ----------
196 filename : unicode
196 filename : unicode
197 The name of a file, including extension
197 The name of a file, including extension
198 path : unicode
198 path : unicode
199 The API path of the target's directory
199 The API path of the target's directory
200
200
201 Returns
201 Returns
202 -------
202 -------
203 name : unicode
203 name : unicode
204 A filename that is unique, based on the input filename.
204 A filename that is unique, based on the input filename.
205 """
205 """
206 path = path.strip('/')
206 path = path.strip('/')
207 basename, ext = os.path.splitext(filename)
207 basename, ext = os.path.splitext(filename)
208 for i in itertools.count():
208 for i in itertools.count():
209 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
209 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
210 ext=ext)
210 ext=ext)
211 if not self.exists(u'{}/{}'.format(path, name)):
211 if not self.exists(u'{}/{}'.format(path, name)):
212 break
212 break
213 return name
213 return name
214
214
215 def validate_notebook_model(self, model):
215 def validate_notebook_model(self, model):
216 """Add failed-validation message to model"""
216 """Add failed-validation message to model"""
217 try:
217 try:
218 validate(model['content'])
218 validate(model['content'])
219 except ValidationError as e:
219 except ValidationError as e:
220 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
220 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
221 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
221 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
222 )
222 )
223 return model
223 return model
224
224
225 def new_untitled(self, path='', type='', ext=''):
225 def new_untitled(self, path='', type='', ext=''):
226 """Create a new untitled file or directory in path
226 """Create a new untitled file or directory in path
227
227
228 path must be a directory
228 path must be a directory
229
229
230 File extension can be specified.
230 File extension can be specified.
231
231
232 Use `new` to create files with a fully specified path (including filename).
232 Use `new` to create files with a fully specified path (including filename).
233 """
233 """
234 path = path.strip('/')
234 path = path.strip('/')
235 if not self.dir_exists(path):
235 if not self.dir_exists(path):
236 raise HTTPError(404, 'No such directory: %s' % path)
236 raise HTTPError(404, 'No such directory: %s' % path)
237
237
238 model = {}
238 model = {}
239 if type:
239 if type:
240 model['type'] = type
240 model['type'] = type
241
241
242 if ext == '.ipynb':
242 if ext == '.ipynb':
243 model.setdefault('type', 'notebook')
243 model.setdefault('type', 'notebook')
244 else:
244 else:
245 model.setdefault('type', 'file')
245 model.setdefault('type', 'file')
246
246
247 if model['type'] == 'directory':
247 if model['type'] == 'directory':
248 untitled = self.untitled_directory
248 untitled = self.untitled_directory
249 elif model['type'] == 'notebook':
249 elif model['type'] == 'notebook':
250 untitled = self.untitled_notebook
250 untitled = self.untitled_notebook
251 ext = '.ipynb'
251 ext = '.ipynb'
252 elif model['type'] == 'file':
252 elif model['type'] == 'file':
253 untitled = self.untitled_file
253 untitled = self.untitled_file
254 else:
254 else:
255 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
255 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
256
256
257 name = self.increment_filename(untitled + ext, path)
257 name = self.increment_filename(untitled + ext, path)
258 path = u'{0}/{1}'.format(path, name)
258 path = u'{0}/{1}'.format(path, name)
259 return self.new(model, path)
259 return self.new(model, path)
260
260
261 def new(self, model=None, path=''):
261 def new(self, model=None, path=''):
262 """Create a new file or directory and return its model with no content.
262 """Create a new file or directory and return its model with no content.
263
263
264 To create a new untitled entity in a directory, use `new_untitled`.
264 To create a new untitled entity in a directory, use `new_untitled`.
265 """
265 """
266 path = path.strip('/')
266 path = path.strip('/')
267 if model is None:
267 if model is None:
268 model = {}
268 model = {}
269
269
270 if path.endswith('.ipynb'):
270 if path.endswith('.ipynb'):
271 model.setdefault('type', 'notebook')
271 model.setdefault('type', 'notebook')
272 else:
272 else:
273 model.setdefault('type', 'file')
273 model.setdefault('type', 'file')
274
274
275 # no content, not a directory, so fill out new-file model
275 # no content, not a directory, so fill out new-file model
276 if 'content' not in model and model['type'] != 'directory':
276 if 'content' not in model and model['type'] != 'directory':
277 if model['type'] == 'notebook':
277 if model['type'] == 'notebook':
278 model['content'] = new_notebook()
278 model['content'] = new_notebook()
279 model['format'] = 'json'
279 model['format'] = 'json'
280 else:
280 else:
281 model['content'] = ''
281 model['content'] = ''
282 model['type'] = 'file'
282 model['type'] = 'file'
283 model['format'] = 'text'
283 model['format'] = 'text'
284
284
285 model = self.save(model, path)
285 model = self.save(model, path)
286 return model
286 return model
287
287
288 def copy(self, from_path, to_path=None):
288 def copy(self, from_path, to_path=None):
289 """Copy an existing file and return its new model.
289 """Copy an existing file and return its new model.
290
290
291 If to_path not specified, it will be the parent directory of from_path.
291 If to_path not specified, it will be the parent directory of from_path.
292 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
292 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
293
293
294 from_path must be a full path to a file.
294 from_path must be a full path to a file.
295 """
295 """
296 path = from_path.strip('/')
296 path = from_path.strip('/')
297 if '/' in path:
297 if '/' in path:
298 from_dir, from_name = path.rsplit('/', 1)
298 from_dir, from_name = path.rsplit('/', 1)
299 else:
299 else:
300 from_dir = ''
300 from_dir = ''
301 from_name = path
301 from_name = path
302
302
303 model = self.get_model(path)
303 model = self.get_model(path)
304 model.pop('path', None)
304 model.pop('path', None)
305 model.pop('name', None)
305 model.pop('name', None)
306 if model['type'] == 'directory':
306 if model['type'] == 'directory':
307 raise HTTPError(400, "Can't copy directories")
307 raise HTTPError(400, "Can't copy directories")
308
308
309 if not to_path:
309 if not to_path:
310 to_path = from_dir
310 to_path = from_dir
311 if self.dir_exists(to_path):
311 if self.dir_exists(to_path):
312 base, ext = os.path.splitext(from_name)
312 base, ext = os.path.splitext(from_name)
313 copy_name = u'{0}-Copy{1}'.format(base, ext)
313 copy_name = u'{0}-Copy{1}'.format(base, ext)
314 to_name = self.increment_filename(copy_name, to_path)
314 to_name = self.increment_filename(copy_name, to_path)
315 to_path = u'{0}/{1}'.format(to_path, to_name)
315 to_path = u'{0}/{1}'.format(to_path, to_name)
316
316
317 model = self.save(model, to_path)
317 model = self.save(model, to_path)
318 return model
318 return model
319
319
320 def log_info(self):
320 def log_info(self):
321 self.log.info(self.info_string())
321 self.log.info(self.info_string())
322
322
323 def trust_notebook(self, path):
323 def trust_notebook(self, path):
324 """Explicitly trust a notebook
324 """Explicitly trust a notebook
325
325
326 Parameters
326 Parameters
327 ----------
327 ----------
328 path : string
328 path : string
329 The path of a notebook
329 The path of a notebook
330 """
330 """
331 model = self.get_model(path)
331 model = self.get_model(path)
332 nb = model['content']
332 nb = model['content']
333 self.log.warn("Trusting notebook %s", path)
333 self.log.warn("Trusting notebook %s", path)
334 self.notary.mark_cells(nb, True)
334 self.notary.mark_cells(nb, True)
335 self.save(model, path)
335 self.save(model, path)
336
336
337 def check_and_sign(self, nb, path=''):
337 def check_and_sign(self, nb, path=''):
338 """Check for trusted cells, and sign the notebook.
338 """Check for trusted cells, and sign the notebook.
339
339
340 Called as a part of saving notebooks.
340 Called as a part of saving notebooks.
341
341
342 Parameters
342 Parameters
343 ----------
343 ----------
344 nb : dict
344 nb : dict
345 The notebook dict
345 The notebook dict
346 path : string
346 path : string
347 The notebook's path (for logging)
347 The notebook's path (for logging)
348 """
348 """
349 if self.notary.check_cells(nb):
349 if self.notary.check_cells(nb):
350 self.notary.sign(nb)
350 self.notary.sign(nb)
351 else:
351 else:
352 self.log.warn("Saving untrusted notebook %s", path)
352 self.log.warn("Saving untrusted notebook %s", path)
353
353
354 def mark_trusted_cells(self, nb, path=''):
354 def mark_trusted_cells(self, nb, path=''):
355 """Mark cells as trusted if the notebook signature matches.
355 """Mark cells as trusted if the notebook signature matches.
356
356
357 Called as a part of loading notebooks.
357 Called as a part of loading notebooks.
358
358
359 Parameters
359 Parameters
360 ----------
360 ----------
361 nb : dict
361 nb : dict
362 The notebook object (in current nbformat)
362 The notebook object (in current nbformat)
363 path : string
363 path : string
364 The notebook's path (for logging)
364 The notebook's path (for logging)
365 """
365 """
366 trusted = self.notary.check_signature(nb)
366 trusted = self.notary.check_signature(nb)
367 if not trusted:
367 if not trusted:
368 self.log.warn("Notebook %s is not trusted", path)
368 self.log.warn("Notebook %s is not trusted", path)
369 self.notary.mark_cells(nb, trusted)
369 self.notary.mark_cells(nb, trusted)
370
370
371 def should_list(self, name):
371 def should_list(self, name):
372 """Should this file/directory name be displayed in a listing?"""
372 """Should this file/directory name be displayed in a listing?"""
373 return not any(fnmatch(name, glob) for glob in self.hide_globs)
373 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,495 +1,504 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 import io
5 import io
6 import json
6 import json
7 import os
7 import os
8 import shutil
8 import shutil
9 from unicodedata import normalize
9 from unicodedata import normalize
10
10
11 pjoin = os.path.join
11 pjoin = os.path.join
12
12
13 import requests
13 import requests
14
14
15 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.utils import url_path_join, url_escape
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import read, write, from_dict
17 from IPython.nbformat import read, write, from_dict
18 from IPython.nbformat.v4 import (
18 from IPython.nbformat.v4 import (
19 new_notebook, new_markdown_cell,
19 new_notebook, new_markdown_cell,
20 )
20 )
21 from IPython.nbformat import v2
21 from IPython.nbformat import v2
22 from IPython.utils import py3compat
22 from IPython.utils import py3compat
23 from IPython.utils.data import uniq_stable
23 from IPython.utils.data import uniq_stable
24
24
25
25
26 def notebooks_only(dir_model):
26 def notebooks_only(dir_model):
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28
28
29 def dirs_only(dir_model):
29 def dirs_only(dir_model):
30 return [x for x in dir_model['content'] if x['type']=='directory']
30 return [x for x in dir_model['content'] if x['type']=='directory']
31
31
32
32
33 class API(object):
33 class API(object):
34 """Wrapper for contents API calls."""
34 """Wrapper for contents API calls."""
35 def __init__(self, base_url):
35 def __init__(self, base_url):
36 self.base_url = base_url
36 self.base_url = base_url
37
37
38 def _req(self, verb, path, body=None):
38 def _req(self, verb, path, body=None):
39 response = requests.request(verb,
39 response = requests.request(verb,
40 url_path_join(self.base_url, 'api/contents', path),
40 url_path_join(self.base_url, 'api/contents', path),
41 data=body,
41 data=body,
42 )
42 )
43 response.raise_for_status()
43 response.raise_for_status()
44 return response
44 return response
45
45
46 def list(self, path='/'):
46 def list(self, path='/'):
47 return self._req('GET', path)
47 return self._req('GET', path)
48
48
49 def read(self, path):
49 def read(self, path, type_=None):
50 if type_ is not None:
51 path += '?type=' + type_
50 return self._req('GET', path)
52 return self._req('GET', path)
51
53
52 def create_untitled(self, path='/', ext='.ipynb'):
54 def create_untitled(self, path='/', ext='.ipynb'):
53 body = None
55 body = None
54 if ext:
56 if ext:
55 body = json.dumps({'ext': ext})
57 body = json.dumps({'ext': ext})
56 return self._req('POST', path, body)
58 return self._req('POST', path, body)
57
59
58 def mkdir_untitled(self, path='/'):
60 def mkdir_untitled(self, path='/'):
59 return self._req('POST', path, json.dumps({'type': 'directory'}))
61 return self._req('POST', path, json.dumps({'type': 'directory'}))
60
62
61 def copy(self, copy_from, path='/'):
63 def copy(self, copy_from, path='/'):
62 body = json.dumps({'copy_from':copy_from})
64 body = json.dumps({'copy_from':copy_from})
63 return self._req('POST', path, body)
65 return self._req('POST', path, body)
64
66
65 def create(self, path='/'):
67 def create(self, path='/'):
66 return self._req('PUT', path)
68 return self._req('PUT', path)
67
69
68 def upload(self, path, body):
70 def upload(self, path, body):
69 return self._req('PUT', path, body)
71 return self._req('PUT', path, body)
70
72
71 def mkdir_untitled(self, path='/'):
73 def mkdir_untitled(self, path='/'):
72 return self._req('POST', path, json.dumps({'type': 'directory'}))
74 return self._req('POST', path, json.dumps({'type': 'directory'}))
73
75
74 def mkdir(self, path='/'):
76 def mkdir(self, path='/'):
75 return self._req('PUT', path, json.dumps({'type': 'directory'}))
77 return self._req('PUT', path, json.dumps({'type': 'directory'}))
76
78
77 def copy_put(self, copy_from, path='/'):
79 def copy_put(self, copy_from, path='/'):
78 body = json.dumps({'copy_from':copy_from})
80 body = json.dumps({'copy_from':copy_from})
79 return self._req('PUT', path, body)
81 return self._req('PUT', path, body)
80
82
81 def save(self, path, body):
83 def save(self, path, body):
82 return self._req('PUT', path, body)
84 return self._req('PUT', path, body)
83
85
84 def delete(self, path='/'):
86 def delete(self, path='/'):
85 return self._req('DELETE', path)
87 return self._req('DELETE', path)
86
88
87 def rename(self, path, new_path):
89 def rename(self, path, new_path):
88 body = json.dumps({'path': new_path})
90 body = json.dumps({'path': new_path})
89 return self._req('PATCH', path, body)
91 return self._req('PATCH', path, body)
90
92
91 def get_checkpoints(self, path):
93 def get_checkpoints(self, path):
92 return self._req('GET', url_path_join(path, 'checkpoints'))
94 return self._req('GET', url_path_join(path, 'checkpoints'))
93
95
94 def new_checkpoint(self, path):
96 def new_checkpoint(self, path):
95 return self._req('POST', url_path_join(path, 'checkpoints'))
97 return self._req('POST', url_path_join(path, 'checkpoints'))
96
98
97 def restore_checkpoint(self, path, checkpoint_id):
99 def restore_checkpoint(self, path, checkpoint_id):
98 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
100 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
99
101
100 def delete_checkpoint(self, path, checkpoint_id):
102 def delete_checkpoint(self, path, checkpoint_id):
101 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
103 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
102
104
103 class APITest(NotebookTestBase):
105 class APITest(NotebookTestBase):
104 """Test the kernels web service API"""
106 """Test the kernels web service API"""
105 dirs_nbs = [('', 'inroot'),
107 dirs_nbs = [('', 'inroot'),
106 ('Directory with spaces in', 'inspace'),
108 ('Directory with spaces in', 'inspace'),
107 (u'unicodΓ©', 'innonascii'),
109 (u'unicodΓ©', 'innonascii'),
108 ('foo', 'a'),
110 ('foo', 'a'),
109 ('foo', 'b'),
111 ('foo', 'b'),
110 ('foo', 'name with spaces'),
112 ('foo', 'name with spaces'),
111 ('foo', u'unicodΓ©'),
113 ('foo', u'unicodΓ©'),
112 ('foo/bar', 'baz'),
114 ('foo/bar', 'baz'),
113 ('ordering', 'A'),
115 ('ordering', 'A'),
114 ('ordering', 'b'),
116 ('ordering', 'b'),
115 ('ordering', 'C'),
117 ('ordering', 'C'),
116 (u'Γ₯ b', u'Γ§ d'),
118 (u'Γ₯ b', u'Γ§ d'),
117 ]
119 ]
118 hidden_dirs = ['.hidden', '__pycache__']
120 hidden_dirs = ['.hidden', '__pycache__']
119
121
120 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
122 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
121 del dirs[0] # remove ''
123 del dirs[0] # remove ''
122 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
124 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
123
125
124 @staticmethod
126 @staticmethod
125 def _blob_for_name(name):
127 def _blob_for_name(name):
126 return name.encode('utf-8') + b'\xFF'
128 return name.encode('utf-8') + b'\xFF'
127
129
128 @staticmethod
130 @staticmethod
129 def _txt_for_name(name):
131 def _txt_for_name(name):
130 return u'%s text file' % name
132 return u'%s text file' % name
131
133
132 def setUp(self):
134 def setUp(self):
133 nbdir = self.notebook_dir.name
135 nbdir = self.notebook_dir.name
134 self.blob = os.urandom(100)
136 self.blob = os.urandom(100)
135 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
137 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
136
138
137 for d in (self.dirs + self.hidden_dirs):
139 for d in (self.dirs + self.hidden_dirs):
138 d.replace('/', os.sep)
140 d.replace('/', os.sep)
139 if not os.path.isdir(pjoin(nbdir, d)):
141 if not os.path.isdir(pjoin(nbdir, d)):
140 os.mkdir(pjoin(nbdir, d))
142 os.mkdir(pjoin(nbdir, d))
141
143
142 for d, name in self.dirs_nbs:
144 for d, name in self.dirs_nbs:
143 d = d.replace('/', os.sep)
145 d = d.replace('/', os.sep)
144 # create a notebook
146 # create a notebook
145 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
147 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
146 encoding='utf-8') as f:
148 encoding='utf-8') as f:
147 nb = new_notebook()
149 nb = new_notebook()
148 write(nb, f, version=4)
150 write(nb, f, version=4)
149
151
150 # create a text file
152 # create a text file
151 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
153 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
152 encoding='utf-8') as f:
154 encoding='utf-8') as f:
153 f.write(self._txt_for_name(name))
155 f.write(self._txt_for_name(name))
154
156
155 # create a binary file
157 # create a binary file
156 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
158 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
157 f.write(self._blob_for_name(name))
159 f.write(self._blob_for_name(name))
158
160
159 self.api = API(self.base_url())
161 self.api = API(self.base_url())
160
162
161 def tearDown(self):
163 def tearDown(self):
162 nbdir = self.notebook_dir.name
164 nbdir = self.notebook_dir.name
163
165
164 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
166 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
165 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
167 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
166
168
167 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
169 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
168 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
170 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
169
171
170 def test_list_notebooks(self):
172 def test_list_notebooks(self):
171 nbs = notebooks_only(self.api.list().json())
173 nbs = notebooks_only(self.api.list().json())
172 self.assertEqual(len(nbs), 1)
174 self.assertEqual(len(nbs), 1)
173 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
175 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
174
176
175 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
177 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
176 self.assertEqual(len(nbs), 1)
178 self.assertEqual(len(nbs), 1)
177 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
179 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
178
180
179 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
181 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
180 self.assertEqual(len(nbs), 1)
182 self.assertEqual(len(nbs), 1)
181 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
183 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
182 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
184 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
183
185
184 nbs = notebooks_only(self.api.list('/foo/bar/').json())
186 nbs = notebooks_only(self.api.list('/foo/bar/').json())
185 self.assertEqual(len(nbs), 1)
187 self.assertEqual(len(nbs), 1)
186 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
188 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
187 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
189 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
188
190
189 nbs = notebooks_only(self.api.list('foo').json())
191 nbs = notebooks_only(self.api.list('foo').json())
190 self.assertEqual(len(nbs), 4)
192 self.assertEqual(len(nbs), 4)
191 nbnames = { normalize('NFC', n['name']) for n in nbs }
193 nbnames = { normalize('NFC', n['name']) for n in nbs }
192 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
194 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
193 expected = { normalize('NFC', name) for name in expected }
195 expected = { normalize('NFC', name) for name in expected }
194 self.assertEqual(nbnames, expected)
196 self.assertEqual(nbnames, expected)
195
197
196 nbs = notebooks_only(self.api.list('ordering').json())
198 nbs = notebooks_only(self.api.list('ordering').json())
197 nbnames = [n['name'] for n in nbs]
199 nbnames = [n['name'] for n in nbs]
198 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
200 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
199 self.assertEqual(nbnames, expected)
201 self.assertEqual(nbnames, expected)
200
202
201 def test_list_dirs(self):
203 def test_list_dirs(self):
202 print(self.api.list().json())
204 print(self.api.list().json())
203 dirs = dirs_only(self.api.list().json())
205 dirs = dirs_only(self.api.list().json())
204 dir_names = {normalize('NFC', d['name']) for d in dirs}
206 dir_names = {normalize('NFC', d['name']) for d in dirs}
205 print(dir_names)
207 print(dir_names)
206 print(self.top_level_dirs)
208 print(self.top_level_dirs)
207 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
209 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
208
210
209 def test_list_nonexistant_dir(self):
211 def test_list_nonexistant_dir(self):
210 with assert_http_error(404):
212 with assert_http_error(404):
211 self.api.list('nonexistant')
213 self.api.list('nonexistant')
212
214
213 def test_get_nb_contents(self):
215 def test_get_nb_contents(self):
214 for d, name in self.dirs_nbs:
216 for d, name in self.dirs_nbs:
215 path = url_path_join(d, name + '.ipynb')
217 path = url_path_join(d, name + '.ipynb')
216 nb = self.api.read(path).json()
218 nb = self.api.read(path).json()
217 self.assertEqual(nb['name'], u'%s.ipynb' % name)
219 self.assertEqual(nb['name'], u'%s.ipynb' % name)
218 self.assertEqual(nb['path'], path)
220 self.assertEqual(nb['path'], path)
219 self.assertEqual(nb['type'], 'notebook')
221 self.assertEqual(nb['type'], 'notebook')
220 self.assertIn('content', nb)
222 self.assertIn('content', nb)
221 self.assertEqual(nb['format'], 'json')
223 self.assertEqual(nb['format'], 'json')
222 self.assertIn('content', nb)
224 self.assertIn('content', nb)
223 self.assertIn('metadata', nb['content'])
225 self.assertIn('metadata', nb['content'])
224 self.assertIsInstance(nb['content']['metadata'], dict)
226 self.assertIsInstance(nb['content']['metadata'], dict)
225
227
226 def test_get_contents_no_such_file(self):
228 def test_get_contents_no_such_file(self):
227 # Name that doesn't exist - should be a 404
229 # Name that doesn't exist - should be a 404
228 with assert_http_error(404):
230 with assert_http_error(404):
229 self.api.read('foo/q.ipynb')
231 self.api.read('foo/q.ipynb')
230
232
231 def test_get_text_file_contents(self):
233 def test_get_text_file_contents(self):
232 for d, name in self.dirs_nbs:
234 for d, name in self.dirs_nbs:
233 path = url_path_join(d, name + '.txt')
235 path = url_path_join(d, name + '.txt')
234 model = self.api.read(path).json()
236 model = self.api.read(path).json()
235 self.assertEqual(model['name'], u'%s.txt' % name)
237 self.assertEqual(model['name'], u'%s.txt' % name)
236 self.assertEqual(model['path'], path)
238 self.assertEqual(model['path'], path)
237 self.assertIn('content', model)
239 self.assertIn('content', model)
238 self.assertEqual(model['format'], 'text')
240 self.assertEqual(model['format'], 'text')
239 self.assertEqual(model['type'], 'file')
241 self.assertEqual(model['type'], 'file')
240 self.assertEqual(model['content'], self._txt_for_name(name))
242 self.assertEqual(model['content'], self._txt_for_name(name))
241
243
242 # Name that doesn't exist - should be a 404
244 # Name that doesn't exist - should be a 404
243 with assert_http_error(404):
245 with assert_http_error(404):
244 self.api.read('foo/q.txt')
246 self.api.read('foo/q.txt')
245
247
246 def test_get_binary_file_contents(self):
248 def test_get_binary_file_contents(self):
247 for d, name in self.dirs_nbs:
249 for d, name in self.dirs_nbs:
248 path = url_path_join(d, name + '.blob')
250 path = url_path_join(d, name + '.blob')
249 model = self.api.read(path).json()
251 model = self.api.read(path).json()
250 self.assertEqual(model['name'], u'%s.blob' % name)
252 self.assertEqual(model['name'], u'%s.blob' % name)
251 self.assertEqual(model['path'], path)
253 self.assertEqual(model['path'], path)
252 self.assertIn('content', model)
254 self.assertIn('content', model)
253 self.assertEqual(model['format'], 'base64')
255 self.assertEqual(model['format'], 'base64')
254 self.assertEqual(model['type'], 'file')
256 self.assertEqual(model['type'], 'file')
255 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
257 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
256 self.assertEqual(model['content'], b64_data)
258 self.assertEqual(model['content'], b64_data)
257
259
258 # Name that doesn't exist - should be a 404
260 # Name that doesn't exist - should be a 404
259 with assert_http_error(404):
261 with assert_http_error(404):
260 self.api.read('foo/q.txt')
262 self.api.read('foo/q.txt')
261
263
264 def test_get_bad_type(self):
265 with assert_http_error(400):
266 self.api.read(u'unicodΓ©', type_='file') # this is a directory
267
268 with assert_http_error(400):
269 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
270
262 def _check_created(self, resp, path, type='notebook'):
271 def _check_created(self, resp, path, type='notebook'):
263 self.assertEqual(resp.status_code, 201)
272 self.assertEqual(resp.status_code, 201)
264 location_header = py3compat.str_to_unicode(resp.headers['Location'])
273 location_header = py3compat.str_to_unicode(resp.headers['Location'])
265 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
274 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
266 rjson = resp.json()
275 rjson = resp.json()
267 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
276 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
268 self.assertEqual(rjson['path'], path)
277 self.assertEqual(rjson['path'], path)
269 self.assertEqual(rjson['type'], type)
278 self.assertEqual(rjson['type'], type)
270 isright = os.path.isdir if type == 'directory' else os.path.isfile
279 isright = os.path.isdir if type == 'directory' else os.path.isfile
271 assert isright(pjoin(
280 assert isright(pjoin(
272 self.notebook_dir.name,
281 self.notebook_dir.name,
273 path.replace('/', os.sep),
282 path.replace('/', os.sep),
274 ))
283 ))
275
284
276 def test_create_untitled(self):
285 def test_create_untitled(self):
277 resp = self.api.create_untitled(path=u'Γ₯ b')
286 resp = self.api.create_untitled(path=u'Γ₯ b')
278 self._check_created(resp, u'Γ₯ b/Untitled0.ipynb')
287 self._check_created(resp, u'Γ₯ b/Untitled0.ipynb')
279
288
280 # Second time
289 # Second time
281 resp = self.api.create_untitled(path=u'Γ₯ b')
290 resp = self.api.create_untitled(path=u'Γ₯ b')
282 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
291 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
283
292
284 # And two directories down
293 # And two directories down
285 resp = self.api.create_untitled(path='foo/bar')
294 resp = self.api.create_untitled(path='foo/bar')
286 self._check_created(resp, 'foo/bar/Untitled0.ipynb')
295 self._check_created(resp, 'foo/bar/Untitled0.ipynb')
287
296
288 def test_create_untitled_txt(self):
297 def test_create_untitled_txt(self):
289 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
298 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
290 self._check_created(resp, 'foo/bar/untitled0.txt', type='file')
299 self._check_created(resp, 'foo/bar/untitled0.txt', type='file')
291
300
292 resp = self.api.read(path='foo/bar/untitled0.txt')
301 resp = self.api.read(path='foo/bar/untitled0.txt')
293 model = resp.json()
302 model = resp.json()
294 self.assertEqual(model['type'], 'file')
303 self.assertEqual(model['type'], 'file')
295 self.assertEqual(model['format'], 'text')
304 self.assertEqual(model['format'], 'text')
296 self.assertEqual(model['content'], '')
305 self.assertEqual(model['content'], '')
297
306
298 def test_upload(self):
307 def test_upload(self):
299 nb = new_notebook()
308 nb = new_notebook()
300 nbmodel = {'content': nb, 'type': 'notebook'}
309 nbmodel = {'content': nb, 'type': 'notebook'}
301 path = u'Γ₯ b/Upload tΓ©st.ipynb'
310 path = u'Γ₯ b/Upload tΓ©st.ipynb'
302 resp = self.api.upload(path, body=json.dumps(nbmodel))
311 resp = self.api.upload(path, body=json.dumps(nbmodel))
303 self._check_created(resp, path)
312 self._check_created(resp, path)
304
313
305 def test_mkdir_untitled(self):
314 def test_mkdir_untitled(self):
306 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
315 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
307 self._check_created(resp, u'Γ₯ b/Untitled Folder0', type='directory')
316 self._check_created(resp, u'Γ₯ b/Untitled Folder0', type='directory')
308
317
309 # Second time
318 # Second time
310 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
319 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
311 self._check_created(resp, u'Γ₯ b/Untitled Folder1', type='directory')
320 self._check_created(resp, u'Γ₯ b/Untitled Folder1', type='directory')
312
321
313 # And two directories down
322 # And two directories down
314 resp = self.api.mkdir_untitled(path='foo/bar')
323 resp = self.api.mkdir_untitled(path='foo/bar')
315 self._check_created(resp, 'foo/bar/Untitled Folder0', type='directory')
324 self._check_created(resp, 'foo/bar/Untitled Folder0', type='directory')
316
325
317 def test_mkdir(self):
326 def test_mkdir(self):
318 path = u'Γ₯ b/New βˆ‚ir'
327 path = u'Γ₯ b/New βˆ‚ir'
319 resp = self.api.mkdir(path)
328 resp = self.api.mkdir(path)
320 self._check_created(resp, path, type='directory')
329 self._check_created(resp, path, type='directory')
321
330
322 def test_mkdir_hidden_400(self):
331 def test_mkdir_hidden_400(self):
323 with assert_http_error(400):
332 with assert_http_error(400):
324 resp = self.api.mkdir(u'Γ₯ b/.hidden')
333 resp = self.api.mkdir(u'Γ₯ b/.hidden')
325
334
326 def test_upload_txt(self):
335 def test_upload_txt(self):
327 body = u'ΓΌnicode tΓ©xt'
336 body = u'ΓΌnicode tΓ©xt'
328 model = {
337 model = {
329 'content' : body,
338 'content' : body,
330 'format' : 'text',
339 'format' : 'text',
331 'type' : 'file',
340 'type' : 'file',
332 }
341 }
333 path = u'Γ₯ b/Upload tΓ©st.txt'
342 path = u'Γ₯ b/Upload tΓ©st.txt'
334 resp = self.api.upload(path, body=json.dumps(model))
343 resp = self.api.upload(path, body=json.dumps(model))
335
344
336 # check roundtrip
345 # check roundtrip
337 resp = self.api.read(path)
346 resp = self.api.read(path)
338 model = resp.json()
347 model = resp.json()
339 self.assertEqual(model['type'], 'file')
348 self.assertEqual(model['type'], 'file')
340 self.assertEqual(model['format'], 'text')
349 self.assertEqual(model['format'], 'text')
341 self.assertEqual(model['content'], body)
350 self.assertEqual(model['content'], body)
342
351
343 def test_upload_b64(self):
352 def test_upload_b64(self):
344 body = b'\xFFblob'
353 body = b'\xFFblob'
345 b64body = base64.encodestring(body).decode('ascii')
354 b64body = base64.encodestring(body).decode('ascii')
346 model = {
355 model = {
347 'content' : b64body,
356 'content' : b64body,
348 'format' : 'base64',
357 'format' : 'base64',
349 'type' : 'file',
358 'type' : 'file',
350 }
359 }
351 path = u'Γ₯ b/Upload tΓ©st.blob'
360 path = u'Γ₯ b/Upload tΓ©st.blob'
352 resp = self.api.upload(path, body=json.dumps(model))
361 resp = self.api.upload(path, body=json.dumps(model))
353
362
354 # check roundtrip
363 # check roundtrip
355 resp = self.api.read(path)
364 resp = self.api.read(path)
356 model = resp.json()
365 model = resp.json()
357 self.assertEqual(model['type'], 'file')
366 self.assertEqual(model['type'], 'file')
358 self.assertEqual(model['path'], path)
367 self.assertEqual(model['path'], path)
359 self.assertEqual(model['format'], 'base64')
368 self.assertEqual(model['format'], 'base64')
360 decoded = base64.decodestring(model['content'].encode('ascii'))
369 decoded = base64.decodestring(model['content'].encode('ascii'))
361 self.assertEqual(decoded, body)
370 self.assertEqual(decoded, body)
362
371
363 def test_upload_v2(self):
372 def test_upload_v2(self):
364 nb = v2.new_notebook()
373 nb = v2.new_notebook()
365 ws = v2.new_worksheet()
374 ws = v2.new_worksheet()
366 nb.worksheets.append(ws)
375 nb.worksheets.append(ws)
367 ws.cells.append(v2.new_code_cell(input='print("hi")'))
376 ws.cells.append(v2.new_code_cell(input='print("hi")'))
368 nbmodel = {'content': nb, 'type': 'notebook'}
377 nbmodel = {'content': nb, 'type': 'notebook'}
369 path = u'Γ₯ b/Upload tΓ©st.ipynb'
378 path = u'Γ₯ b/Upload tΓ©st.ipynb'
370 resp = self.api.upload(path, body=json.dumps(nbmodel))
379 resp = self.api.upload(path, body=json.dumps(nbmodel))
371 self._check_created(resp, path)
380 self._check_created(resp, path)
372 resp = self.api.read(path)
381 resp = self.api.read(path)
373 data = resp.json()
382 data = resp.json()
374 self.assertEqual(data['content']['nbformat'], 4)
383 self.assertEqual(data['content']['nbformat'], 4)
375
384
376 def test_copy(self):
385 def test_copy(self):
377 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'unicodΓ©')
386 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'unicodΓ©')
378 self._check_created(resp, u'unicodΓ©/Γ§ d-Copy0.ipynb')
387 self._check_created(resp, u'unicodΓ©/Γ§ d-Copy0.ipynb')
379
388
380 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
389 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
381 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy0.ipynb')
390 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy0.ipynb')
382
391
383 def test_copy_path(self):
392 def test_copy_path(self):
384 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
393 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
385 self._check_created(resp, u'Γ₯ b/a-Copy0.ipynb')
394 self._check_created(resp, u'Γ₯ b/a-Copy0.ipynb')
386
395
387 def test_copy_put_400(self):
396 def test_copy_put_400(self):
388 with assert_http_error(400):
397 with assert_http_error(400):
389 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
398 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
390
399
391 def test_copy_dir_400(self):
400 def test_copy_dir_400(self):
392 # can't copy directories
401 # can't copy directories
393 with assert_http_error(400):
402 with assert_http_error(400):
394 resp = self.api.copy(u'Γ₯ b', u'foo')
403 resp = self.api.copy(u'Γ₯ b', u'foo')
395
404
396 def test_delete(self):
405 def test_delete(self):
397 for d, name in self.dirs_nbs:
406 for d, name in self.dirs_nbs:
398 print('%r, %r' % (d, name))
407 print('%r, %r' % (d, name))
399 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
408 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
400 self.assertEqual(resp.status_code, 204)
409 self.assertEqual(resp.status_code, 204)
401
410
402 for d in self.dirs + ['/']:
411 for d in self.dirs + ['/']:
403 nbs = notebooks_only(self.api.list(d).json())
412 nbs = notebooks_only(self.api.list(d).json())
404 print('------')
413 print('------')
405 print(d)
414 print(d)
406 print(nbs)
415 print(nbs)
407 self.assertEqual(nbs, [])
416 self.assertEqual(nbs, [])
408
417
409 def test_delete_dirs(self):
418 def test_delete_dirs(self):
410 # depth-first delete everything, so we don't try to delete empty directories
419 # depth-first delete everything, so we don't try to delete empty directories
411 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
420 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
412 listing = self.api.list(name).json()['content']
421 listing = self.api.list(name).json()['content']
413 for model in listing:
422 for model in listing:
414 self.api.delete(model['path'])
423 self.api.delete(model['path'])
415 listing = self.api.list('/').json()['content']
424 listing = self.api.list('/').json()['content']
416 self.assertEqual(listing, [])
425 self.assertEqual(listing, [])
417
426
418 def test_delete_non_empty_dir(self):
427 def test_delete_non_empty_dir(self):
419 """delete non-empty dir raises 400"""
428 """delete non-empty dir raises 400"""
420 with assert_http_error(400):
429 with assert_http_error(400):
421 self.api.delete(u'Γ₯ b')
430 self.api.delete(u'Γ₯ b')
422
431
423 def test_rename(self):
432 def test_rename(self):
424 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
433 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
425 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
434 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
426 self.assertEqual(resp.json()['name'], 'z.ipynb')
435 self.assertEqual(resp.json()['name'], 'z.ipynb')
427 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
436 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
428 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
437 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
429
438
430 nbs = notebooks_only(self.api.list('foo').json())
439 nbs = notebooks_only(self.api.list('foo').json())
431 nbnames = set(n['name'] for n in nbs)
440 nbnames = set(n['name'] for n in nbs)
432 self.assertIn('z.ipynb', nbnames)
441 self.assertIn('z.ipynb', nbnames)
433 self.assertNotIn('a.ipynb', nbnames)
442 self.assertNotIn('a.ipynb', nbnames)
434
443
435 def test_rename_existing(self):
444 def test_rename_existing(self):
436 with assert_http_error(409):
445 with assert_http_error(409):
437 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
446 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
438
447
439 def test_save(self):
448 def test_save(self):
440 resp = self.api.read('foo/a.ipynb')
449 resp = self.api.read('foo/a.ipynb')
441 nbcontent = json.loads(resp.text)['content']
450 nbcontent = json.loads(resp.text)['content']
442 nb = from_dict(nbcontent)
451 nb = from_dict(nbcontent)
443 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
452 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
444
453
445 nbmodel= {'content': nb, 'type': 'notebook'}
454 nbmodel= {'content': nb, 'type': 'notebook'}
446 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
455 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
447
456
448 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
457 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
449 with io.open(nbfile, 'r', encoding='utf-8') as f:
458 with io.open(nbfile, 'r', encoding='utf-8') as f:
450 newnb = read(f, as_version=4)
459 newnb = read(f, as_version=4)
451 self.assertEqual(newnb.cells[0].source,
460 self.assertEqual(newnb.cells[0].source,
452 u'Created by test Β³')
461 u'Created by test Β³')
453 nbcontent = self.api.read('foo/a.ipynb').json()['content']
462 nbcontent = self.api.read('foo/a.ipynb').json()['content']
454 newnb = from_dict(nbcontent)
463 newnb = from_dict(nbcontent)
455 self.assertEqual(newnb.cells[0].source,
464 self.assertEqual(newnb.cells[0].source,
456 u'Created by test Β³')
465 u'Created by test Β³')
457
466
458
467
459 def test_checkpoints(self):
468 def test_checkpoints(self):
460 resp = self.api.read('foo/a.ipynb')
469 resp = self.api.read('foo/a.ipynb')
461 r = self.api.new_checkpoint('foo/a.ipynb')
470 r = self.api.new_checkpoint('foo/a.ipynb')
462 self.assertEqual(r.status_code, 201)
471 self.assertEqual(r.status_code, 201)
463 cp1 = r.json()
472 cp1 = r.json()
464 self.assertEqual(set(cp1), {'id', 'last_modified'})
473 self.assertEqual(set(cp1), {'id', 'last_modified'})
465 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
474 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
466
475
467 # Modify it
476 # Modify it
468 nbcontent = json.loads(resp.text)['content']
477 nbcontent = json.loads(resp.text)['content']
469 nb = from_dict(nbcontent)
478 nb = from_dict(nbcontent)
470 hcell = new_markdown_cell('Created by test')
479 hcell = new_markdown_cell('Created by test')
471 nb.cells.append(hcell)
480 nb.cells.append(hcell)
472 # Save
481 # Save
473 nbmodel= {'content': nb, 'type': 'notebook'}
482 nbmodel= {'content': nb, 'type': 'notebook'}
474 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
483 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
475
484
476 # List checkpoints
485 # List checkpoints
477 cps = self.api.get_checkpoints('foo/a.ipynb').json()
486 cps = self.api.get_checkpoints('foo/a.ipynb').json()
478 self.assertEqual(cps, [cp1])
487 self.assertEqual(cps, [cp1])
479
488
480 nbcontent = self.api.read('foo/a.ipynb').json()['content']
489 nbcontent = self.api.read('foo/a.ipynb').json()['content']
481 nb = from_dict(nbcontent)
490 nb = from_dict(nbcontent)
482 self.assertEqual(nb.cells[0].source, 'Created by test')
491 self.assertEqual(nb.cells[0].source, 'Created by test')
483
492
484 # Restore cp1
493 # Restore cp1
485 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
494 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
486 self.assertEqual(r.status_code, 204)
495 self.assertEqual(r.status_code, 204)
487 nbcontent = self.api.read('foo/a.ipynb').json()['content']
496 nbcontent = self.api.read('foo/a.ipynb').json()['content']
488 nb = from_dict(nbcontent)
497 nb = from_dict(nbcontent)
489 self.assertEqual(nb.cells, [])
498 self.assertEqual(nb.cells, [])
490
499
491 # Delete cp1
500 # Delete cp1
492 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
501 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
493 self.assertEqual(r.status_code, 204)
502 self.assertEqual(r.status_code, 204)
494 cps = self.api.get_checkpoints('foo/a.ipynb').json()
503 cps = self.api.get_checkpoints('foo/a.ipynb').json()
495 self.assertEqual(cps, [])
504 self.assertEqual(cps, [])
@@ -1,348 +1,362 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import logging
5 import logging
6 import os
6 import os
7
7
8 from tornado.web import HTTPError
8 from tornado.web import HTTPError
9 from unittest import TestCase
9 from unittest import TestCase
10 from tempfile import NamedTemporaryFile
10 from tempfile import NamedTemporaryFile
11
11
12 from IPython.nbformat import v4 as nbformat
12 from IPython.nbformat import v4 as nbformat
13
13
14 from IPython.utils.tempdir import TemporaryDirectory
14 from IPython.utils.tempdir import TemporaryDirectory
15 from IPython.utils.traitlets import TraitError
15 from IPython.utils.traitlets import TraitError
16 from IPython.html.utils import url_path_join
16 from IPython.html.utils import url_path_join
17 from IPython.testing import decorators as dec
17 from IPython.testing import decorators as dec
18
18
19 from ..filemanager import FileContentsManager
19 from ..filemanager import FileContentsManager
20 from ..manager import ContentsManager
20 from ..manager import ContentsManager
21
21
22
22
23 class TestFileContentsManager(TestCase):
23 class TestFileContentsManager(TestCase):
24
24
25 def test_root_dir(self):
25 def test_root_dir(self):
26 with TemporaryDirectory() as td:
26 with TemporaryDirectory() as td:
27 fm = FileContentsManager(root_dir=td)
27 fm = FileContentsManager(root_dir=td)
28 self.assertEqual(fm.root_dir, td)
28 self.assertEqual(fm.root_dir, td)
29
29
30 def test_missing_root_dir(self):
30 def test_missing_root_dir(self):
31 with TemporaryDirectory() as td:
31 with TemporaryDirectory() as td:
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34
34
35 def test_invalid_root_dir(self):
35 def test_invalid_root_dir(self):
36 with NamedTemporaryFile() as tf:
36 with NamedTemporaryFile() as tf:
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38
38
39 def test_get_os_path(self):
39 def test_get_os_path(self):
40 # full filesystem path should be returned with correct operating system
40 # full filesystem path should be returned with correct operating system
41 # separators.
41 # separators.
42 with TemporaryDirectory() as td:
42 with TemporaryDirectory() as td:
43 root = td
43 root = td
44 fm = FileContentsManager(root_dir=root)
44 fm = FileContentsManager(root_dir=root)
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 self.assertEqual(path, fs_path)
48 self.assertEqual(path, fs_path)
49
49
50 fm = FileContentsManager(root_dir=root)
50 fm = FileContentsManager(root_dir=root)
51 path = fm._get_os_path('test.ipynb')
51 path = fm._get_os_path('test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 self.assertEqual(path, fs_path)
53 self.assertEqual(path, fs_path)
54
54
55 fm = FileContentsManager(root_dir=root)
55 fm = FileContentsManager(root_dir=root)
56 path = fm._get_os_path('////test.ipynb')
56 path = fm._get_os_path('////test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 self.assertEqual(path, fs_path)
58 self.assertEqual(path, fs_path)
59
59
60 def test_checkpoint_subdir(self):
60 def test_checkpoint_subdir(self):
61 subd = u'sub βˆ‚ir'
61 subd = u'sub βˆ‚ir'
62 cp_name = 'test-cp.ipynb'
62 cp_name = 'test-cp.ipynb'
63 with TemporaryDirectory() as td:
63 with TemporaryDirectory() as td:
64 root = td
64 root = td
65 os.mkdir(os.path.join(td, subd))
65 os.mkdir(os.path.join(td, subd))
66 fm = FileContentsManager(root_dir=root)
66 fm = FileContentsManager(root_dir=root)
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
69 self.assertNotEqual(cp_dir, cp_subdir)
69 self.assertNotEqual(cp_dir, cp_subdir)
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72
72
73
73
74 class TestContentsManager(TestCase):
74 class TestContentsManager(TestCase):
75
75
76 def setUp(self):
76 def setUp(self):
77 self._temp_dir = TemporaryDirectory()
77 self._temp_dir = TemporaryDirectory()
78 self.td = self._temp_dir.name
78 self.td = self._temp_dir.name
79 self.contents_manager = FileContentsManager(
79 self.contents_manager = FileContentsManager(
80 root_dir=self.td,
80 root_dir=self.td,
81 log=logging.getLogger()
81 log=logging.getLogger()
82 )
82 )
83
83
84 def tearDown(self):
84 def tearDown(self):
85 self._temp_dir.cleanup()
85 self._temp_dir.cleanup()
86
86
87 def make_dir(self, abs_path, rel_path):
87 def make_dir(self, abs_path, rel_path):
88 """make subdirectory, rel_path is the relative path
88 """make subdirectory, rel_path is the relative path
89 to that directory from the location where the server started"""
89 to that directory from the location where the server started"""
90 os_path = os.path.join(abs_path, rel_path)
90 os_path = os.path.join(abs_path, rel_path)
91 try:
91 try:
92 os.makedirs(os_path)
92 os.makedirs(os_path)
93 except OSError:
93 except OSError:
94 print("Directory already exists: %r" % os_path)
94 print("Directory already exists: %r" % os_path)
95 return os_path
95 return os_path
96
96
97 def add_code_cell(self, nb):
97 def add_code_cell(self, nb):
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 nb.cells.append(cell)
100 nb.cells.append(cell)
101
101
102 def new_notebook(self):
102 def new_notebook(self):
103 cm = self.contents_manager
103 cm = self.contents_manager
104 model = cm.new_untitled(type='notebook')
104 model = cm.new_untitled(type='notebook')
105 name = model['name']
105 name = model['name']
106 path = model['path']
106 path = model['path']
107
107
108 full_model = cm.get_model(path)
108 full_model = cm.get_model(path)
109 nb = full_model['content']
109 nb = full_model['content']
110 self.add_code_cell(nb)
110 self.add_code_cell(nb)
111
111
112 cm.save(full_model, path)
112 cm.save(full_model, path)
113 return nb, name, path
113 return nb, name, path
114
114
115 def test_new_untitled(self):
115 def test_new_untitled(self):
116 cm = self.contents_manager
116 cm = self.contents_manager
117 # Test in root directory
117 # Test in root directory
118 model = cm.new_untitled(type='notebook')
118 model = cm.new_untitled(type='notebook')
119 assert isinstance(model, dict)
119 assert isinstance(model, dict)
120 self.assertIn('name', model)
120 self.assertIn('name', model)
121 self.assertIn('path', model)
121 self.assertIn('path', model)
122 self.assertIn('type', model)
122 self.assertIn('type', model)
123 self.assertEqual(model['type'], 'notebook')
123 self.assertEqual(model['type'], 'notebook')
124 self.assertEqual(model['name'], 'Untitled0.ipynb')
124 self.assertEqual(model['name'], 'Untitled0.ipynb')
125 self.assertEqual(model['path'], 'Untitled0.ipynb')
125 self.assertEqual(model['path'], 'Untitled0.ipynb')
126
126
127 # Test in sub-directory
127 # Test in sub-directory
128 model = cm.new_untitled(type='directory')
128 model = cm.new_untitled(type='directory')
129 assert isinstance(model, dict)
129 assert isinstance(model, dict)
130 self.assertIn('name', model)
130 self.assertIn('name', model)
131 self.assertIn('path', model)
131 self.assertIn('path', model)
132 self.assertIn('type', model)
132 self.assertIn('type', model)
133 self.assertEqual(model['type'], 'directory')
133 self.assertEqual(model['type'], 'directory')
134 self.assertEqual(model['name'], 'Untitled Folder0')
134 self.assertEqual(model['name'], 'Untitled Folder0')
135 self.assertEqual(model['path'], 'Untitled Folder0')
135 self.assertEqual(model['path'], 'Untitled Folder0')
136 sub_dir = model['path']
136 sub_dir = model['path']
137
137
138 model = cm.new_untitled(path=sub_dir)
138 model = cm.new_untitled(path=sub_dir)
139 assert isinstance(model, dict)
139 assert isinstance(model, dict)
140 self.assertIn('name', model)
140 self.assertIn('name', model)
141 self.assertIn('path', model)
141 self.assertIn('path', model)
142 self.assertIn('type', model)
142 self.assertIn('type', model)
143 self.assertEqual(model['type'], 'file')
143 self.assertEqual(model['type'], 'file')
144 self.assertEqual(model['name'], 'untitled0')
144 self.assertEqual(model['name'], 'untitled0')
145 self.assertEqual(model['path'], '%s/untitled0' % sub_dir)
145 self.assertEqual(model['path'], '%s/untitled0' % sub_dir)
146
146
147 def test_get(self):
147 def test_get(self):
148 cm = self.contents_manager
148 cm = self.contents_manager
149 # Create a notebook
149 # Create a notebook
150 model = cm.new_untitled(type='notebook')
150 model = cm.new_untitled(type='notebook')
151 name = model['name']
151 name = model['name']
152 path = model['path']
152 path = model['path']
153
153
154 # Check that we 'get' on the notebook we just created
154 # Check that we 'get' on the notebook we just created
155 model2 = cm.get_model(path)
155 model2 = cm.get_model(path)
156 assert isinstance(model2, dict)
156 assert isinstance(model2, dict)
157 self.assertIn('name', model2)
157 self.assertIn('name', model2)
158 self.assertIn('path', model2)
158 self.assertIn('path', model2)
159 self.assertEqual(model['name'], name)
159 self.assertEqual(model['name'], name)
160 self.assertEqual(model['path'], path)
160 self.assertEqual(model['path'], path)
161
161
162 nb_as_file = cm.get_model(path, content=True, type_='file')
163 self.assertEqual(nb_as_file['path'], path)
164 self.assertEqual(nb_as_file['type'], 'file')
165 self.assertEqual(nb_as_file['format'], 'text')
166 self.assertNotIsInstance(nb_as_file['content'], dict)
167
162 # Test in sub-directory
168 # Test in sub-directory
163 sub_dir = '/foo/'
169 sub_dir = '/foo/'
164 self.make_dir(cm.root_dir, 'foo')
170 self.make_dir(cm.root_dir, 'foo')
165 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
171 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
166 model2 = cm.get_model(sub_dir + name)
172 model2 = cm.get_model(sub_dir + name)
167 assert isinstance(model2, dict)
173 assert isinstance(model2, dict)
168 self.assertIn('name', model2)
174 self.assertIn('name', model2)
169 self.assertIn('path', model2)
175 self.assertIn('path', model2)
170 self.assertIn('content', model2)
176 self.assertIn('content', model2)
171 self.assertEqual(model2['name'], 'Untitled0.ipynb')
177 self.assertEqual(model2['name'], 'Untitled0.ipynb')
172 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
178 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
179
180 # Test getting directory model
181 dirmodel = cm.get_model('foo')
182 self.assertEqual(dirmodel['type'], 'directory')
183
184 with self.assertRaises(HTTPError):
185 cm.get_model('foo', type_='file')
186
173
187
174 @dec.skip_win32
188 @dec.skip_win32
175 def test_bad_symlink(self):
189 def test_bad_symlink(self):
176 cm = self.contents_manager
190 cm = self.contents_manager
177 path = 'test bad symlink'
191 path = 'test bad symlink'
178 os_path = self.make_dir(cm.root_dir, path)
192 os_path = self.make_dir(cm.root_dir, path)
179
193
180 file_model = cm.new_untitled(path=path, ext='.txt')
194 file_model = cm.new_untitled(path=path, ext='.txt')
181
195
182 # create a broken symlink
196 # create a broken symlink
183 os.symlink("target", os.path.join(os_path, "bad symlink"))
197 os.symlink("target", os.path.join(os_path, "bad symlink"))
184 model = cm.get_model(path)
198 model = cm.get_model(path)
185 self.assertEqual(model['content'], [file_model])
199 self.assertEqual(model['content'], [file_model])
186
200
187 @dec.skip_win32
201 @dec.skip_win32
188 def test_good_symlink(self):
202 def test_good_symlink(self):
189 cm = self.contents_manager
203 cm = self.contents_manager
190 parent = 'test good symlink'
204 parent = 'test good symlink'
191 name = 'good symlink'
205 name = 'good symlink'
192 path = '{0}/{1}'.format(parent, name)
206 path = '{0}/{1}'.format(parent, name)
193 os_path = self.make_dir(cm.root_dir, parent)
207 os_path = self.make_dir(cm.root_dir, parent)
194
208
195 file_model = cm.new(path=parent + '/zfoo.txt')
209 file_model = cm.new(path=parent + '/zfoo.txt')
196
210
197 # create a good symlink
211 # create a good symlink
198 os.symlink(file_model['name'], os.path.join(os_path, name))
212 os.symlink(file_model['name'], os.path.join(os_path, name))
199 symlink_model = cm.get_model(path, content=False)
213 symlink_model = cm.get_model(path, content=False)
200 dir_model = cm.get_model(parent)
214 dir_model = cm.get_model(parent)
201 self.assertEqual(
215 self.assertEqual(
202 sorted(dir_model['content'], key=lambda x: x['name']),
216 sorted(dir_model['content'], key=lambda x: x['name']),
203 [symlink_model, file_model],
217 [symlink_model, file_model],
204 )
218 )
205
219
206 def test_update(self):
220 def test_update(self):
207 cm = self.contents_manager
221 cm = self.contents_manager
208 # Create a notebook
222 # Create a notebook
209 model = cm.new_untitled(type='notebook')
223 model = cm.new_untitled(type='notebook')
210 name = model['name']
224 name = model['name']
211 path = model['path']
225 path = model['path']
212
226
213 # Change the name in the model for rename
227 # Change the name in the model for rename
214 model['path'] = 'test.ipynb'
228 model['path'] = 'test.ipynb'
215 model = cm.update(model, path)
229 model = cm.update(model, path)
216 assert isinstance(model, dict)
230 assert isinstance(model, dict)
217 self.assertIn('name', model)
231 self.assertIn('name', model)
218 self.assertIn('path', model)
232 self.assertIn('path', model)
219 self.assertEqual(model['name'], 'test.ipynb')
233 self.assertEqual(model['name'], 'test.ipynb')
220
234
221 # Make sure the old name is gone
235 # Make sure the old name is gone
222 self.assertRaises(HTTPError, cm.get_model, path)
236 self.assertRaises(HTTPError, cm.get_model, path)
223
237
224 # Test in sub-directory
238 # Test in sub-directory
225 # Create a directory and notebook in that directory
239 # Create a directory and notebook in that directory
226 sub_dir = '/foo/'
240 sub_dir = '/foo/'
227 self.make_dir(cm.root_dir, 'foo')
241 self.make_dir(cm.root_dir, 'foo')
228 model = cm.new_untitled(path=sub_dir, type='notebook')
242 model = cm.new_untitled(path=sub_dir, type='notebook')
229 name = model['name']
243 name = model['name']
230 path = model['path']
244 path = model['path']
231
245
232 # Change the name in the model for rename
246 # Change the name in the model for rename
233 d = path.rsplit('/', 1)[0]
247 d = path.rsplit('/', 1)[0]
234 new_path = model['path'] = d + '/test_in_sub.ipynb'
248 new_path = model['path'] = d + '/test_in_sub.ipynb'
235 model = cm.update(model, path)
249 model = cm.update(model, path)
236 assert isinstance(model, dict)
250 assert isinstance(model, dict)
237 self.assertIn('name', model)
251 self.assertIn('name', model)
238 self.assertIn('path', model)
252 self.assertIn('path', model)
239 self.assertEqual(model['name'], 'test_in_sub.ipynb')
253 self.assertEqual(model['name'], 'test_in_sub.ipynb')
240 self.assertEqual(model['path'], new_path)
254 self.assertEqual(model['path'], new_path)
241
255
242 # Make sure the old name is gone
256 # Make sure the old name is gone
243 self.assertRaises(HTTPError, cm.get_model, path)
257 self.assertRaises(HTTPError, cm.get_model, path)
244
258
245 def test_save(self):
259 def test_save(self):
246 cm = self.contents_manager
260 cm = self.contents_manager
247 # Create a notebook
261 # Create a notebook
248 model = cm.new_untitled(type='notebook')
262 model = cm.new_untitled(type='notebook')
249 name = model['name']
263 name = model['name']
250 path = model['path']
264 path = model['path']
251
265
252 # Get the model with 'content'
266 # Get the model with 'content'
253 full_model = cm.get_model(path)
267 full_model = cm.get_model(path)
254
268
255 # Save the notebook
269 # Save the notebook
256 model = cm.save(full_model, path)
270 model = cm.save(full_model, path)
257 assert isinstance(model, dict)
271 assert isinstance(model, dict)
258 self.assertIn('name', model)
272 self.assertIn('name', model)
259 self.assertIn('path', model)
273 self.assertIn('path', model)
260 self.assertEqual(model['name'], name)
274 self.assertEqual(model['name'], name)
261 self.assertEqual(model['path'], path)
275 self.assertEqual(model['path'], path)
262
276
263 # Test in sub-directory
277 # Test in sub-directory
264 # Create a directory and notebook in that directory
278 # Create a directory and notebook in that directory
265 sub_dir = '/foo/'
279 sub_dir = '/foo/'
266 self.make_dir(cm.root_dir, 'foo')
280 self.make_dir(cm.root_dir, 'foo')
267 model = cm.new_untitled(path=sub_dir, type='notebook')
281 model = cm.new_untitled(path=sub_dir, type='notebook')
268 name = model['name']
282 name = model['name']
269 path = model['path']
283 path = model['path']
270 model = cm.get_model(path)
284 model = cm.get_model(path)
271
285
272 # Change the name in the model for rename
286 # Change the name in the model for rename
273 model = cm.save(model, path)
287 model = cm.save(model, path)
274 assert isinstance(model, dict)
288 assert isinstance(model, dict)
275 self.assertIn('name', model)
289 self.assertIn('name', model)
276 self.assertIn('path', model)
290 self.assertIn('path', model)
277 self.assertEqual(model['name'], 'Untitled0.ipynb')
291 self.assertEqual(model['name'], 'Untitled0.ipynb')
278 self.assertEqual(model['path'], 'foo/Untitled0.ipynb')
292 self.assertEqual(model['path'], 'foo/Untitled0.ipynb')
279
293
280 def test_delete(self):
294 def test_delete(self):
281 cm = self.contents_manager
295 cm = self.contents_manager
282 # Create a notebook
296 # Create a notebook
283 nb, name, path = self.new_notebook()
297 nb, name, path = self.new_notebook()
284
298
285 # Delete the notebook
299 # Delete the notebook
286 cm.delete(path)
300 cm.delete(path)
287
301
288 # Check that a 'get' on the deleted notebook raises and error
302 # Check that a 'get' on the deleted notebook raises and error
289 self.assertRaises(HTTPError, cm.get_model, path)
303 self.assertRaises(HTTPError, cm.get_model, path)
290
304
291 def test_copy(self):
305 def test_copy(self):
292 cm = self.contents_manager
306 cm = self.contents_manager
293 parent = u'Γ₯ b'
307 parent = u'Γ₯ b'
294 name = u'nb √.ipynb'
308 name = u'nb √.ipynb'
295 path = u'{0}/{1}'.format(parent, name)
309 path = u'{0}/{1}'.format(parent, name)
296 os.mkdir(os.path.join(cm.root_dir, parent))
310 os.mkdir(os.path.join(cm.root_dir, parent))
297 orig = cm.new(path=path)
311 orig = cm.new(path=path)
298
312
299 # copy with unspecified name
313 # copy with unspecified name
300 copy = cm.copy(path)
314 copy = cm.copy(path)
301 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
315 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
302
316
303 # copy with specified name
317 # copy with specified name
304 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
318 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
305 self.assertEqual(copy2['name'], u'copy 2.ipynb')
319 self.assertEqual(copy2['name'], u'copy 2.ipynb')
306 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
320 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
307
321
308 def test_trust_notebook(self):
322 def test_trust_notebook(self):
309 cm = self.contents_manager
323 cm = self.contents_manager
310 nb, name, path = self.new_notebook()
324 nb, name, path = self.new_notebook()
311
325
312 untrusted = cm.get_model(path)['content']
326 untrusted = cm.get_model(path)['content']
313 assert not cm.notary.check_cells(untrusted)
327 assert not cm.notary.check_cells(untrusted)
314
328
315 # print(untrusted)
329 # print(untrusted)
316 cm.trust_notebook(path)
330 cm.trust_notebook(path)
317 trusted = cm.get_model(path)['content']
331 trusted = cm.get_model(path)['content']
318 # print(trusted)
332 # print(trusted)
319 assert cm.notary.check_cells(trusted)
333 assert cm.notary.check_cells(trusted)
320
334
321 def test_mark_trusted_cells(self):
335 def test_mark_trusted_cells(self):
322 cm = self.contents_manager
336 cm = self.contents_manager
323 nb, name, path = self.new_notebook()
337 nb, name, path = self.new_notebook()
324
338
325 cm.mark_trusted_cells(nb, path)
339 cm.mark_trusted_cells(nb, path)
326 for cell in nb.cells:
340 for cell in nb.cells:
327 if cell.cell_type == 'code':
341 if cell.cell_type == 'code':
328 assert not cell.metadata.trusted
342 assert not cell.metadata.trusted
329
343
330 cm.trust_notebook(path)
344 cm.trust_notebook(path)
331 nb = cm.get_model(path)['content']
345 nb = cm.get_model(path)['content']
332 for cell in nb.cells:
346 for cell in nb.cells:
333 if cell.cell_type == 'code':
347 if cell.cell_type == 'code':
334 assert cell.metadata.trusted
348 assert cell.metadata.trusted
335
349
336 def test_check_and_sign(self):
350 def test_check_and_sign(self):
337 cm = self.contents_manager
351 cm = self.contents_manager
338 nb, name, path = self.new_notebook()
352 nb, name, path = self.new_notebook()
339
353
340 cm.mark_trusted_cells(nb, path)
354 cm.mark_trusted_cells(nb, path)
341 cm.check_and_sign(nb, path)
355 cm.check_and_sign(nb, path)
342 assert not cm.notary.check_signature(nb)
356 assert not cm.notary.check_signature(nb)
343
357
344 cm.trust_notebook(path)
358 cm.trust_notebook(path)
345 nb = cm.get_model(path)['content']
359 nb = cm.get_model(path)['content']
346 cm.mark_trusted_cells(nb, path)
360 cm.mark_trusted_cells(nb, path)
347 cm.check_and_sign(nb, path)
361 cm.check_and_sign(nb, path)
348 assert cm.notary.check_signature(nb)
362 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now