##// END OF EJS Templates
Allow specifying format when getting files from contents API
Thomas Kluyver -
Show More
@@ -1,545 +1,559 b''
1 """A contents manager that uses the local file system for storage."""
1 """A contents manager that uses the local file system for storage."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6 import base64
7 import io
7 import io
8 import os
8 import os
9 import glob
9 import glob
10 import shutil
10 import shutil
11
11
12 from tornado import web
12 from tornado import web
13
13
14 from .manager import ContentsManager
14 from .manager import ContentsManager
15 from IPython import nbformat
15 from IPython import nbformat
16 from IPython.utils.io import atomic_writing
16 from IPython.utils.io import atomic_writing
17 from IPython.utils.path import ensure_dir_exists
17 from IPython.utils.path import ensure_dir_exists
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
19 from IPython.utils.py3compat import getcwd
19 from IPython.utils.py3compat import getcwd
20 from IPython.utils import tz
20 from IPython.utils import tz
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
22
22
23
23
24 class FileContentsManager(ContentsManager):
24 class FileContentsManager(ContentsManager):
25
25
26 root_dir = Unicode(getcwd(), config=True)
26 root_dir = Unicode(getcwd(), config=True)
27
27
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
29 def _save_script_changed(self):
29 def _save_script_changed(self):
30 self.log.warn("""
30 self.log.warn("""
31 Automatically saving notebooks as scripts has been removed.
31 Automatically saving notebooks as scripts has been removed.
32 Use `ipython nbconvert --to python [notebook]` instead.
32 Use `ipython nbconvert --to python [notebook]` instead.
33 """)
33 """)
34
34
35 def _root_dir_changed(self, name, old, new):
35 def _root_dir_changed(self, name, old, new):
36 """Do a bit of validation of the root_dir."""
36 """Do a bit of validation of the root_dir."""
37 if not os.path.isabs(new):
37 if not os.path.isabs(new):
38 # If we receive a non-absolute path, make it absolute.
38 # If we receive a non-absolute path, make it absolute.
39 self.root_dir = os.path.abspath(new)
39 self.root_dir = os.path.abspath(new)
40 return
40 return
41 if not os.path.isdir(new):
41 if not os.path.isdir(new):
42 raise TraitError("%r is not a directory" % new)
42 raise TraitError("%r is not a directory" % new)
43
43
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
45 help="""The directory name in which to keep file checkpoints
45 help="""The directory name in which to keep file checkpoints
46
46
47 This is a path relative to the file's own directory.
47 This is a path relative to the file's own directory.
48
48
49 By default, it is .ipynb_checkpoints
49 By default, it is .ipynb_checkpoints
50 """
50 """
51 )
51 )
52
52
53 def _copy(self, src, dest):
53 def _copy(self, src, dest):
54 """copy src to dest
54 """copy src to dest
55
55
56 like shutil.copy2, but log errors in copystat
56 like shutil.copy2, but log errors in copystat
57 """
57 """
58 shutil.copyfile(src, dest)
58 shutil.copyfile(src, dest)
59 try:
59 try:
60 shutil.copystat(src, dest)
60 shutil.copystat(src, dest)
61 except OSError as e:
61 except OSError as e:
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
63
63
64 def _get_os_path(self, path):
64 def _get_os_path(self, path):
65 """Given an API path, return its file system path.
65 """Given an API path, return its file system path.
66
66
67 Parameters
67 Parameters
68 ----------
68 ----------
69 path : string
69 path : string
70 The relative API path to the named file.
70 The relative API path to the named file.
71
71
72 Returns
72 Returns
73 -------
73 -------
74 path : string
74 path : string
75 Native, absolute OS path to for a file.
75 Native, absolute OS path to for a file.
76 """
76 """
77 return to_os_path(path, self.root_dir)
77 return to_os_path(path, self.root_dir)
78
78
79 def dir_exists(self, path):
79 def dir_exists(self, path):
80 """Does the API-style path refer to an extant directory?
80 """Does the API-style path refer to an extant directory?
81
81
82 API-style wrapper for os.path.isdir
82 API-style wrapper for os.path.isdir
83
83
84 Parameters
84 Parameters
85 ----------
85 ----------
86 path : string
86 path : string
87 The path to check. This is an API path (`/` separated,
87 The path to check. This is an API path (`/` separated,
88 relative to root_dir).
88 relative to root_dir).
89
89
90 Returns
90 Returns
91 -------
91 -------
92 exists : bool
92 exists : bool
93 Whether the path is indeed a directory.
93 Whether the path is indeed a directory.
94 """
94 """
95 path = path.strip('/')
95 path = path.strip('/')
96 os_path = self._get_os_path(path=path)
96 os_path = self._get_os_path(path=path)
97 return os.path.isdir(os_path)
97 return os.path.isdir(os_path)
98
98
99 def is_hidden(self, path):
99 def is_hidden(self, path):
100 """Does the API style path correspond to a hidden directory or file?
100 """Does the API style path correspond to a hidden directory or file?
101
101
102 Parameters
102 Parameters
103 ----------
103 ----------
104 path : string
104 path : string
105 The path to check. This is an API path (`/` separated,
105 The path to check. This is an API path (`/` separated,
106 relative to root_dir).
106 relative to root_dir).
107
107
108 Returns
108 Returns
109 -------
109 -------
110 hidden : bool
110 hidden : bool
111 Whether the path exists and is hidden.
111 Whether the path exists and is hidden.
112 """
112 """
113 path = path.strip('/')
113 path = path.strip('/')
114 os_path = self._get_os_path(path=path)
114 os_path = self._get_os_path(path=path)
115 return is_hidden(os_path, self.root_dir)
115 return is_hidden(os_path, self.root_dir)
116
116
117 def file_exists(self, path):
117 def file_exists(self, path):
118 """Returns True if the file exists, else returns False.
118 """Returns True if the file exists, else returns False.
119
119
120 API-style wrapper for os.path.isfile
120 API-style wrapper for os.path.isfile
121
121
122 Parameters
122 Parameters
123 ----------
123 ----------
124 path : string
124 path : string
125 The relative path to the file (with '/' as separator)
125 The relative path to the file (with '/' as separator)
126
126
127 Returns
127 Returns
128 -------
128 -------
129 exists : bool
129 exists : bool
130 Whether the file exists.
130 Whether the file exists.
131 """
131 """
132 path = path.strip('/')
132 path = path.strip('/')
133 os_path = self._get_os_path(path)
133 os_path = self._get_os_path(path)
134 return os.path.isfile(os_path)
134 return os.path.isfile(os_path)
135
135
136 def exists(self, path):
136 def exists(self, path):
137 """Returns True if the path exists, else returns False.
137 """Returns True if the path exists, else returns False.
138
138
139 API-style wrapper for os.path.exists
139 API-style wrapper for os.path.exists
140
140
141 Parameters
141 Parameters
142 ----------
142 ----------
143 path : string
143 path : string
144 The API path to the file (with '/' as separator)
144 The API path to the file (with '/' as separator)
145
145
146 Returns
146 Returns
147 -------
147 -------
148 exists : bool
148 exists : bool
149 Whether the target exists.
149 Whether the target exists.
150 """
150 """
151 path = path.strip('/')
151 path = path.strip('/')
152 os_path = self._get_os_path(path=path)
152 os_path = self._get_os_path(path=path)
153 return os.path.exists(os_path)
153 return os.path.exists(os_path)
154
154
155 def _base_model(self, path):
155 def _base_model(self, path):
156 """Build the common base of a contents model"""
156 """Build the common base of a contents model"""
157 os_path = self._get_os_path(path)
157 os_path = self._get_os_path(path)
158 info = os.stat(os_path)
158 info = os.stat(os_path)
159 last_modified = tz.utcfromtimestamp(info.st_mtime)
159 last_modified = tz.utcfromtimestamp(info.st_mtime)
160 created = tz.utcfromtimestamp(info.st_ctime)
160 created = tz.utcfromtimestamp(info.st_ctime)
161 # Create the base model.
161 # Create the base model.
162 model = {}
162 model = {}
163 model['name'] = path.rsplit('/', 1)[-1]
163 model['name'] = path.rsplit('/', 1)[-1]
164 model['path'] = path
164 model['path'] = path
165 model['last_modified'] = last_modified
165 model['last_modified'] = last_modified
166 model['created'] = created
166 model['created'] = created
167 model['content'] = None
167 model['content'] = None
168 model['format'] = None
168 model['format'] = None
169 return model
169 return model
170
170
171 def _dir_model(self, path, content=True):
171 def _dir_model(self, path, content=True):
172 """Build a model for a directory
172 """Build a model for a directory
173
173
174 if content is requested, will include a listing of the directory
174 if content is requested, will include a listing of the directory
175 """
175 """
176 os_path = self._get_os_path(path)
176 os_path = self._get_os_path(path)
177
177
178 four_o_four = u'directory does not exist: %r' % os_path
178 four_o_four = u'directory does not exist: %r' % os_path
179
179
180 if not os.path.isdir(os_path):
180 if not os.path.isdir(os_path):
181 raise web.HTTPError(404, four_o_four)
181 raise web.HTTPError(404, four_o_four)
182 elif is_hidden(os_path, self.root_dir):
182 elif is_hidden(os_path, self.root_dir):
183 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
183 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
184 os_path
184 os_path
185 )
185 )
186 raise web.HTTPError(404, four_o_four)
186 raise web.HTTPError(404, four_o_four)
187
187
188 model = self._base_model(path)
188 model = self._base_model(path)
189 model['type'] = 'directory'
189 model['type'] = 'directory'
190 if content:
190 if content:
191 model['content'] = contents = []
191 model['content'] = contents = []
192 os_dir = self._get_os_path(path)
192 os_dir = self._get_os_path(path)
193 for name in os.listdir(os_dir):
193 for name in os.listdir(os_dir):
194 os_path = os.path.join(os_dir, name)
194 os_path = os.path.join(os_dir, name)
195 # skip over broken symlinks in listing
195 # skip over broken symlinks in listing
196 if not os.path.exists(os_path):
196 if not os.path.exists(os_path):
197 self.log.warn("%s doesn't exist", os_path)
197 self.log.warn("%s doesn't exist", os_path)
198 continue
198 continue
199 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
199 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
200 self.log.debug("%s not a regular file", os_path)
200 self.log.debug("%s not a regular file", os_path)
201 continue
201 continue
202 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
202 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
203 contents.append(self.get_model(
203 contents.append(self.get_model(
204 path='%s/%s' % (path, name),
204 path='%s/%s' % (path, name),
205 content=False)
205 content=False)
206 )
206 )
207
207
208 model['format'] = 'json'
208 model['format'] = 'json'
209
209
210 return model
210 return model
211
211
212 def _file_model(self, path, content=True):
212 def _file_model(self, path, content=True, format=None):
213 """Build a model for a file
213 """Build a model for a file
214
214
215 if content is requested, include the file contents.
215 if content is requested, include the file contents.
216 UTF-8 text files will be unicode, binary files will be base64-encoded.
216
217 format:
218 If 'text', the contents will be decoded as UTF-8.
219 If 'base64', the raw bytes contents will be encoded as base64.
220 If not specified, try to decode as UTF-8, and fall back to base64
217 """
221 """
218 model = self._base_model(path)
222 model = self._base_model(path)
219 model['type'] = 'file'
223 model['type'] = 'file'
220 if content:
224 if content:
221 os_path = self._get_os_path(path)
225 os_path = self._get_os_path(path)
222 if not os.path.isfile(os_path):
226 if not os.path.isfile(os_path):
223 # could be FIFO
227 # could be FIFO
224 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
228 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
225 with io.open(os_path, 'rb') as f:
229 with io.open(os_path, 'rb') as f:
226 bcontent = f.read()
230 bcontent = f.read()
227 try:
231
228 model['content'] = bcontent.decode('utf8')
232 if format != 'base64':
229 except UnicodeError as e:
233 try:
234 model['content'] = bcontent.decode('utf8')
235 except UnicodeError as e:
236 if format == 'text':
237 raise web.HTTPError(400, "%s is not UTF-8 encoded" % path)
238 else:
239 model['format'] = 'text'
240
241 if model['content'] is None:
230 model['content'] = base64.encodestring(bcontent).decode('ascii')
242 model['content'] = base64.encodestring(bcontent).decode('ascii')
231 model['format'] = 'base64'
243 model['format'] = 'base64'
232 else:
244
233 model['format'] = 'text'
234 return model
245 return model
235
246
236
247
237 def _notebook_model(self, path, content=True):
248 def _notebook_model(self, path, content=True):
238 """Build a notebook model
249 """Build a notebook model
239
250
240 if content is requested, the notebook content will be populated
251 if content is requested, the notebook content will be populated
241 as a JSON structure (not double-serialized)
252 as a JSON structure (not double-serialized)
242 """
253 """
243 model = self._base_model(path)
254 model = self._base_model(path)
244 model['type'] = 'notebook'
255 model['type'] = 'notebook'
245 if content:
256 if content:
246 os_path = self._get_os_path(path)
257 os_path = self._get_os_path(path)
247 with io.open(os_path, 'r', encoding='utf-8') as f:
258 with io.open(os_path, 'r', encoding='utf-8') as f:
248 try:
259 try:
249 nb = nbformat.read(f, as_version=4)
260 nb = nbformat.read(f, as_version=4)
250 except Exception as e:
261 except Exception as e:
251 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
262 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
252 self.mark_trusted_cells(nb, path)
263 self.mark_trusted_cells(nb, path)
253 model['content'] = nb
264 model['content'] = nb
254 model['format'] = 'json'
265 model['format'] = 'json'
255 self.validate_notebook_model(model)
266 self.validate_notebook_model(model)
256 return model
267 return model
257
268
258 def get_model(self, path, content=True, type_=None):
269 def get_model(self, path, content=True, type_=None, format=None):
259 """ Takes a path for an entity and returns its model
270 """ Takes a path for an entity and returns its model
260
271
261 Parameters
272 Parameters
262 ----------
273 ----------
263 path : str
274 path : str
264 the API path that describes the relative path for the target
275 the API path that describes the relative path for the target
265 content : bool
276 content : bool
266 Whether to include the contents in the reply
277 Whether to include the contents in the reply
267 type_ : str, optional
278 type_ : str, optional
268 The requested type - 'file', 'notebook', or 'directory'.
279 The requested type - 'file', 'notebook', or 'directory'.
269 Will raise HTTPError 406 if the content doesn't match.
280 Will raise HTTPError 406 if the content doesn't match.
281 format : str, optional
282 The requested format for file contents. 'text' or 'base64'.
283 Ignored if this returns a notebook or directory model.
270
284
271 Returns
285 Returns
272 -------
286 -------
273 model : dict
287 model : dict
274 the contents model. If content=True, returns the contents
288 the contents model. If content=True, returns the contents
275 of the file or directory as well.
289 of the file or directory as well.
276 """
290 """
277 path = path.strip('/')
291 path = path.strip('/')
278
292
279 if not self.exists(path):
293 if not self.exists(path):
280 raise web.HTTPError(404, u'No such file or directory: %s' % path)
294 raise web.HTTPError(404, u'No such file or directory: %s' % path)
281
295
282 os_path = self._get_os_path(path)
296 os_path = self._get_os_path(path)
283 if os.path.isdir(os_path):
297 if os.path.isdir(os_path):
284 if type_ not in (None, 'directory'):
298 if type_ not in (None, 'directory'):
285 raise web.HTTPError(400,
299 raise web.HTTPError(400,
286 u'%s is a directory, not a %s' % (path, type_))
300 u'%s is a directory, not a %s' % (path, type_))
287 model = self._dir_model(path, content=content)
301 model = self._dir_model(path, content=content)
288 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
302 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
289 model = self._notebook_model(path, content=content)
303 model = self._notebook_model(path, content=content)
290 else:
304 else:
291 if type_ == 'directory':
305 if type_ == 'directory':
292 raise web.HTTPError(400,
306 raise web.HTTPError(400,
293 u'%s is not a directory')
307 u'%s is not a directory')
294 model = self._file_model(path, content=content)
308 model = self._file_model(path, content=content, format=format)
295 return model
309 return model
296
310
297 def _save_notebook(self, os_path, model, path=''):
311 def _save_notebook(self, os_path, model, path=''):
298 """save a notebook file"""
312 """save a notebook file"""
299 # Save the notebook file
313 # Save the notebook file
300 nb = nbformat.from_dict(model['content'])
314 nb = nbformat.from_dict(model['content'])
301
315
302 self.check_and_sign(nb, path)
316 self.check_and_sign(nb, path)
303
317
304 with atomic_writing(os_path, encoding='utf-8') as f:
318 with atomic_writing(os_path, encoding='utf-8') as f:
305 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
319 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
306
320
307 def _save_file(self, os_path, model, path=''):
321 def _save_file(self, os_path, model, path=''):
308 """save a non-notebook file"""
322 """save a non-notebook file"""
309 fmt = model.get('format', None)
323 fmt = model.get('format', None)
310 if fmt not in {'text', 'base64'}:
324 if fmt not in {'text', 'base64'}:
311 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
325 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
312 try:
326 try:
313 content = model['content']
327 content = model['content']
314 if fmt == 'text':
328 if fmt == 'text':
315 bcontent = content.encode('utf8')
329 bcontent = content.encode('utf8')
316 else:
330 else:
317 b64_bytes = content.encode('ascii')
331 b64_bytes = content.encode('ascii')
318 bcontent = base64.decodestring(b64_bytes)
332 bcontent = base64.decodestring(b64_bytes)
319 except Exception as e:
333 except Exception as e:
320 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
334 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
321 with atomic_writing(os_path, text=False) as f:
335 with atomic_writing(os_path, text=False) as f:
322 f.write(bcontent)
336 f.write(bcontent)
323
337
324 def _save_directory(self, os_path, model, path=''):
338 def _save_directory(self, os_path, model, path=''):
325 """create a directory"""
339 """create a directory"""
326 if is_hidden(os_path, self.root_dir):
340 if is_hidden(os_path, self.root_dir):
327 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
341 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
328 if not os.path.exists(os_path):
342 if not os.path.exists(os_path):
329 os.mkdir(os_path)
343 os.mkdir(os_path)
330 elif not os.path.isdir(os_path):
344 elif not os.path.isdir(os_path):
331 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
345 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
332 else:
346 else:
333 self.log.debug("Directory %r already exists", os_path)
347 self.log.debug("Directory %r already exists", os_path)
334
348
335 def save(self, model, path=''):
349 def save(self, model, path=''):
336 """Save the file model and return the model with no content."""
350 """Save the file model and return the model with no content."""
337 path = path.strip('/')
351 path = path.strip('/')
338
352
339 if 'type' not in model:
353 if 'type' not in model:
340 raise web.HTTPError(400, u'No file type provided')
354 raise web.HTTPError(400, u'No file type provided')
341 if 'content' not in model and model['type'] != 'directory':
355 if 'content' not in model and model['type'] != 'directory':
342 raise web.HTTPError(400, u'No file content provided')
356 raise web.HTTPError(400, u'No file content provided')
343
357
344 # One checkpoint should always exist
358 # One checkpoint should always exist
345 if self.file_exists(path) and not self.list_checkpoints(path):
359 if self.file_exists(path) and not self.list_checkpoints(path):
346 self.create_checkpoint(path)
360 self.create_checkpoint(path)
347
361
348 os_path = self._get_os_path(path)
362 os_path = self._get_os_path(path)
349 self.log.debug("Saving %s", os_path)
363 self.log.debug("Saving %s", os_path)
350 try:
364 try:
351 if model['type'] == 'notebook':
365 if model['type'] == 'notebook':
352 self._save_notebook(os_path, model, path)
366 self._save_notebook(os_path, model, path)
353 elif model['type'] == 'file':
367 elif model['type'] == 'file':
354 self._save_file(os_path, model, path)
368 self._save_file(os_path, model, path)
355 elif model['type'] == 'directory':
369 elif model['type'] == 'directory':
356 self._save_directory(os_path, model, path)
370 self._save_directory(os_path, model, path)
357 else:
371 else:
358 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
372 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
359 except web.HTTPError:
373 except web.HTTPError:
360 raise
374 raise
361 except Exception as e:
375 except Exception as e:
362 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
376 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
363
377
364 validation_message = None
378 validation_message = None
365 if model['type'] == 'notebook':
379 if model['type'] == 'notebook':
366 self.validate_notebook_model(model)
380 self.validate_notebook_model(model)
367 validation_message = model.get('message', None)
381 validation_message = model.get('message', None)
368
382
369 model = self.get_model(path, content=False)
383 model = self.get_model(path, content=False)
370 if validation_message:
384 if validation_message:
371 model['message'] = validation_message
385 model['message'] = validation_message
372 return model
386 return model
373
387
374 def update(self, model, path):
388 def update(self, model, path):
375 """Update the file's path
389 """Update the file's path
376
390
377 For use in PATCH requests, to enable renaming a file without
391 For use in PATCH requests, to enable renaming a file without
378 re-uploading its contents. Only used for renaming at the moment.
392 re-uploading its contents. Only used for renaming at the moment.
379 """
393 """
380 path = path.strip('/')
394 path = path.strip('/')
381 new_path = model.get('path', path).strip('/')
395 new_path = model.get('path', path).strip('/')
382 if path != new_path:
396 if path != new_path:
383 self.rename(path, new_path)
397 self.rename(path, new_path)
384 model = self.get_model(new_path, content=False)
398 model = self.get_model(new_path, content=False)
385 return model
399 return model
386
400
387 def delete(self, path):
401 def delete(self, path):
388 """Delete file at path."""
402 """Delete file at path."""
389 path = path.strip('/')
403 path = path.strip('/')
390 os_path = self._get_os_path(path)
404 os_path = self._get_os_path(path)
391 rm = os.unlink
405 rm = os.unlink
392 if os.path.isdir(os_path):
406 if os.path.isdir(os_path):
393 listing = os.listdir(os_path)
407 listing = os.listdir(os_path)
394 # don't delete non-empty directories (checkpoints dir doesn't count)
408 # don't delete non-empty directories (checkpoints dir doesn't count)
395 if listing and listing != [self.checkpoint_dir]:
409 if listing and listing != [self.checkpoint_dir]:
396 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
410 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
397 elif not os.path.isfile(os_path):
411 elif not os.path.isfile(os_path):
398 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
412 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
399
413
400 # clear checkpoints
414 # clear checkpoints
401 for checkpoint in self.list_checkpoints(path):
415 for checkpoint in self.list_checkpoints(path):
402 checkpoint_id = checkpoint['id']
416 checkpoint_id = checkpoint['id']
403 cp_path = self.get_checkpoint_path(checkpoint_id, path)
417 cp_path = self.get_checkpoint_path(checkpoint_id, path)
404 if os.path.isfile(cp_path):
418 if os.path.isfile(cp_path):
405 self.log.debug("Unlinking checkpoint %s", cp_path)
419 self.log.debug("Unlinking checkpoint %s", cp_path)
406 os.unlink(cp_path)
420 os.unlink(cp_path)
407
421
408 if os.path.isdir(os_path):
422 if os.path.isdir(os_path):
409 self.log.debug("Removing directory %s", os_path)
423 self.log.debug("Removing directory %s", os_path)
410 shutil.rmtree(os_path)
424 shutil.rmtree(os_path)
411 else:
425 else:
412 self.log.debug("Unlinking file %s", os_path)
426 self.log.debug("Unlinking file %s", os_path)
413 rm(os_path)
427 rm(os_path)
414
428
415 def rename(self, old_path, new_path):
429 def rename(self, old_path, new_path):
416 """Rename a file."""
430 """Rename a file."""
417 old_path = old_path.strip('/')
431 old_path = old_path.strip('/')
418 new_path = new_path.strip('/')
432 new_path = new_path.strip('/')
419 if new_path == old_path:
433 if new_path == old_path:
420 return
434 return
421
435
422 new_os_path = self._get_os_path(new_path)
436 new_os_path = self._get_os_path(new_path)
423 old_os_path = self._get_os_path(old_path)
437 old_os_path = self._get_os_path(old_path)
424
438
425 # Should we proceed with the move?
439 # Should we proceed with the move?
426 if os.path.exists(new_os_path):
440 if os.path.exists(new_os_path):
427 raise web.HTTPError(409, u'File already exists: %s' % new_path)
441 raise web.HTTPError(409, u'File already exists: %s' % new_path)
428
442
429 # Move the file
443 # Move the file
430 try:
444 try:
431 shutil.move(old_os_path, new_os_path)
445 shutil.move(old_os_path, new_os_path)
432 except Exception as e:
446 except Exception as e:
433 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
447 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
434
448
435 # Move the checkpoints
449 # Move the checkpoints
436 old_checkpoints = self.list_checkpoints(old_path)
450 old_checkpoints = self.list_checkpoints(old_path)
437 for cp in old_checkpoints:
451 for cp in old_checkpoints:
438 checkpoint_id = cp['id']
452 checkpoint_id = cp['id']
439 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
453 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
440 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
454 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
441 if os.path.isfile(old_cp_path):
455 if os.path.isfile(old_cp_path):
442 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
456 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
443 shutil.move(old_cp_path, new_cp_path)
457 shutil.move(old_cp_path, new_cp_path)
444
458
445 # Checkpoint-related utilities
459 # Checkpoint-related utilities
446
460
447 def get_checkpoint_path(self, checkpoint_id, path):
461 def get_checkpoint_path(self, checkpoint_id, path):
448 """find the path to a checkpoint"""
462 """find the path to a checkpoint"""
449 path = path.strip('/')
463 path = path.strip('/')
450 parent, name = ('/' + path).rsplit('/', 1)
464 parent, name = ('/' + path).rsplit('/', 1)
451 parent = parent.strip('/')
465 parent = parent.strip('/')
452 basename, ext = os.path.splitext(name)
466 basename, ext = os.path.splitext(name)
453 filename = u"{name}-{checkpoint_id}{ext}".format(
467 filename = u"{name}-{checkpoint_id}{ext}".format(
454 name=basename,
468 name=basename,
455 checkpoint_id=checkpoint_id,
469 checkpoint_id=checkpoint_id,
456 ext=ext,
470 ext=ext,
457 )
471 )
458 os_path = self._get_os_path(path=parent)
472 os_path = self._get_os_path(path=parent)
459 cp_dir = os.path.join(os_path, self.checkpoint_dir)
473 cp_dir = os.path.join(os_path, self.checkpoint_dir)
460 ensure_dir_exists(cp_dir)
474 ensure_dir_exists(cp_dir)
461 cp_path = os.path.join(cp_dir, filename)
475 cp_path = os.path.join(cp_dir, filename)
462 return cp_path
476 return cp_path
463
477
464 def get_checkpoint_model(self, checkpoint_id, path):
478 def get_checkpoint_model(self, checkpoint_id, path):
465 """construct the info dict for a given checkpoint"""
479 """construct the info dict for a given checkpoint"""
466 path = path.strip('/')
480 path = path.strip('/')
467 cp_path = self.get_checkpoint_path(checkpoint_id, path)
481 cp_path = self.get_checkpoint_path(checkpoint_id, path)
468 stats = os.stat(cp_path)
482 stats = os.stat(cp_path)
469 last_modified = tz.utcfromtimestamp(stats.st_mtime)
483 last_modified = tz.utcfromtimestamp(stats.st_mtime)
470 info = dict(
484 info = dict(
471 id = checkpoint_id,
485 id = checkpoint_id,
472 last_modified = last_modified,
486 last_modified = last_modified,
473 )
487 )
474 return info
488 return info
475
489
476 # public checkpoint API
490 # public checkpoint API
477
491
478 def create_checkpoint(self, path):
492 def create_checkpoint(self, path):
479 """Create a checkpoint from the current state of a file"""
493 """Create a checkpoint from the current state of a file"""
480 path = path.strip('/')
494 path = path.strip('/')
481 if not self.file_exists(path):
495 if not self.file_exists(path):
482 raise web.HTTPError(404)
496 raise web.HTTPError(404)
483 src_path = self._get_os_path(path)
497 src_path = self._get_os_path(path)
484 # only the one checkpoint ID:
498 # only the one checkpoint ID:
485 checkpoint_id = u"checkpoint"
499 checkpoint_id = u"checkpoint"
486 cp_path = self.get_checkpoint_path(checkpoint_id, path)
500 cp_path = self.get_checkpoint_path(checkpoint_id, path)
487 self.log.debug("creating checkpoint for %s", path)
501 self.log.debug("creating checkpoint for %s", path)
488 self._copy(src_path, cp_path)
502 self._copy(src_path, cp_path)
489
503
490 # return the checkpoint info
504 # return the checkpoint info
491 return self.get_checkpoint_model(checkpoint_id, path)
505 return self.get_checkpoint_model(checkpoint_id, path)
492
506
493 def list_checkpoints(self, path):
507 def list_checkpoints(self, path):
494 """list the checkpoints for a given file
508 """list the checkpoints for a given file
495
509
496 This contents manager currently only supports one checkpoint per file.
510 This contents manager currently only supports one checkpoint per file.
497 """
511 """
498 path = path.strip('/')
512 path = path.strip('/')
499 checkpoint_id = "checkpoint"
513 checkpoint_id = "checkpoint"
500 os_path = self.get_checkpoint_path(checkpoint_id, path)
514 os_path = self.get_checkpoint_path(checkpoint_id, path)
501 if not os.path.exists(os_path):
515 if not os.path.exists(os_path):
502 return []
516 return []
503 else:
517 else:
504 return [self.get_checkpoint_model(checkpoint_id, path)]
518 return [self.get_checkpoint_model(checkpoint_id, path)]
505
519
506
520
507 def restore_checkpoint(self, checkpoint_id, path):
521 def restore_checkpoint(self, checkpoint_id, path):
508 """restore a file to a checkpointed state"""
522 """restore a file to a checkpointed state"""
509 path = path.strip('/')
523 path = path.strip('/')
510 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
524 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
511 nb_path = self._get_os_path(path)
525 nb_path = self._get_os_path(path)
512 cp_path = self.get_checkpoint_path(checkpoint_id, path)
526 cp_path = self.get_checkpoint_path(checkpoint_id, path)
513 if not os.path.isfile(cp_path):
527 if not os.path.isfile(cp_path):
514 self.log.debug("checkpoint file does not exist: %s", cp_path)
528 self.log.debug("checkpoint file does not exist: %s", cp_path)
515 raise web.HTTPError(404,
529 raise web.HTTPError(404,
516 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
530 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
517 )
531 )
518 # ensure notebook is readable (never restore from an unreadable notebook)
532 # ensure notebook is readable (never restore from an unreadable notebook)
519 if cp_path.endswith('.ipynb'):
533 if cp_path.endswith('.ipynb'):
520 with io.open(cp_path, 'r', encoding='utf-8') as f:
534 with io.open(cp_path, 'r', encoding='utf-8') as f:
521 nbformat.read(f, as_version=4)
535 nbformat.read(f, as_version=4)
522 self._copy(cp_path, nb_path)
536 self._copy(cp_path, nb_path)
523 self.log.debug("copying %s -> %s", cp_path, nb_path)
537 self.log.debug("copying %s -> %s", cp_path, nb_path)
524
538
525 def delete_checkpoint(self, checkpoint_id, path):
539 def delete_checkpoint(self, checkpoint_id, path):
526 """delete a file's checkpoint"""
540 """delete a file's checkpoint"""
527 path = path.strip('/')
541 path = path.strip('/')
528 cp_path = self.get_checkpoint_path(checkpoint_id, path)
542 cp_path = self.get_checkpoint_path(checkpoint_id, path)
529 if not os.path.isfile(cp_path):
543 if not os.path.isfile(cp_path):
530 raise web.HTTPError(404,
544 raise web.HTTPError(404,
531 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
545 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
532 )
546 )
533 self.log.debug("unlinking %s", cp_path)
547 self.log.debug("unlinking %s", cp_path)
534 os.unlink(cp_path)
548 os.unlink(cp_path)
535
549
536 def info_string(self):
550 def info_string(self):
537 return "Serving notebooks from local directory: %s" % self.root_dir
551 return "Serving notebooks from local directory: %s" % self.root_dir
538
552
539 def get_kernel_path(self, path, model=None):
553 def get_kernel_path(self, path, model=None):
540 """Return the initial working dir a kernel associated with a given notebook"""
554 """Return the initial working dir a kernel associated with a given notebook"""
541 if '/' in path:
555 if '/' in path:
542 parent_dir = path.rsplit('/', 1)[0]
556 parent_dir = path.rsplit('/', 1)[0]
543 else:
557 else:
544 parent_dir = ''
558 parent_dir = ''
545 return self._get_os_path(parent_dir)
559 return self._get_os_path(parent_dir)
@@ -1,261 +1,265 b''
1 """Tornado handlers for the contents web service."""
1 """Tornado handlers for the contents web service."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7
7
8 from tornado import web
8 from tornado import web
9
9
10 from IPython.html.utils import url_path_join, url_escape
10 from IPython.html.utils import url_path_join, url_escape
11 from IPython.utils.jsonutil import date_default
11 from IPython.utils.jsonutil import date_default
12
12
13 from IPython.html.base.handlers import (
13 from IPython.html.base.handlers import (
14 IPythonHandler, json_errors, path_regex,
14 IPythonHandler, json_errors, path_regex,
15 )
15 )
16
16
17
17
18 def sort_key(model):
18 def sort_key(model):
19 """key function for case-insensitive sort by name and type"""
19 """key function for case-insensitive sort by name and type"""
20 iname = model['name'].lower()
20 iname = model['name'].lower()
21 type_key = {
21 type_key = {
22 'directory' : '0',
22 'directory' : '0',
23 'notebook' : '1',
23 'notebook' : '1',
24 'file' : '2',
24 'file' : '2',
25 }.get(model['type'], '9')
25 }.get(model['type'], '9')
26 return u'%s%s' % (type_key, iname)
26 return u'%s%s' % (type_key, iname)
27
27
28 class ContentsHandler(IPythonHandler):
28 class ContentsHandler(IPythonHandler):
29
29
30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
31
31
32 def location_url(self, path):
32 def location_url(self, path):
33 """Return the full URL location of a file.
33 """Return the full URL location of a file.
34
34
35 Parameters
35 Parameters
36 ----------
36 ----------
37 path : unicode
37 path : unicode
38 The API path of the file, such as "foo/bar.txt".
38 The API path of the file, such as "foo/bar.txt".
39 """
39 """
40 return url_escape(url_path_join(
40 return url_escape(url_path_join(
41 self.base_url, 'api', 'contents', path
41 self.base_url, 'api', 'contents', path
42 ))
42 ))
43
43
44 def _finish_model(self, model, location=True):
44 def _finish_model(self, model, location=True):
45 """Finish a JSON request with a model, setting relevant headers, etc."""
45 """Finish a JSON request with a model, setting relevant headers, etc."""
46 if location:
46 if location:
47 location = self.location_url(model['path'])
47 location = self.location_url(model['path'])
48 self.set_header('Location', location)
48 self.set_header('Location', location)
49 self.set_header('Last-Modified', model['last_modified'])
49 self.set_header('Last-Modified', model['last_modified'])
50 self.finish(json.dumps(model, default=date_default))
50 self.finish(json.dumps(model, default=date_default))
51
51
52 @web.authenticated
52 @web.authenticated
53 @json_errors
53 @json_errors
54 def get(self, path=''):
54 def get(self, path=''):
55 """Return a model for a file or directory.
55 """Return a model for a file or directory.
56
56
57 A directory model contains a list of models (without content)
57 A directory model contains a list of models (without content)
58 of the files and directories it contains.
58 of the files and directories it contains.
59 """
59 """
60 path = path or ''
60 path = path or ''
61 type_ = self.get_query_argument('type', default=None)
61 type_ = self.get_query_argument('type', default=None)
62 if type_ not in {None, 'directory', 'file', 'notebook'}:
62 if type_ not in {None, 'directory', 'file', 'notebook'}:
63 raise web.HTTPError(400, u'Type %r is invalid' % type_)
63 raise web.HTTPError(400, u'Type %r is invalid' % type_)
64
64
65 model = self.contents_manager.get_model(path=path, type_=type_)
65 format = self.get_query_argument('format', default=None)#
66 if format not in {None, 'text', 'base64'}:
67 raise web.HTTPError(400, u'Format %r is invalid' % format)
68
69 model = self.contents_manager.get_model(path=path, type_=type_, format=format)
66 if model['type'] == 'directory':
70 if model['type'] == 'directory':
67 # group listing by type, then by name (case-insensitive)
71 # group listing by type, then by name (case-insensitive)
68 # FIXME: sorting should be done in the frontends
72 # FIXME: sorting should be done in the frontends
69 model['content'].sort(key=sort_key)
73 model['content'].sort(key=sort_key)
70 self._finish_model(model, location=False)
74 self._finish_model(model, location=False)
71
75
72 @web.authenticated
76 @web.authenticated
73 @json_errors
77 @json_errors
74 def patch(self, path=''):
78 def patch(self, path=''):
75 """PATCH renames a file or directory without re-uploading content."""
79 """PATCH renames a file or directory without re-uploading content."""
76 cm = self.contents_manager
80 cm = self.contents_manager
77 model = self.get_json_body()
81 model = self.get_json_body()
78 if model is None:
82 if model is None:
79 raise web.HTTPError(400, u'JSON body missing')
83 raise web.HTTPError(400, u'JSON body missing')
80 model = cm.update(model, path)
84 model = cm.update(model, path)
81 self._finish_model(model)
85 self._finish_model(model)
82
86
83 def _copy(self, copy_from, copy_to=None):
87 def _copy(self, copy_from, copy_to=None):
84 """Copy a file, optionally specifying a target directory."""
88 """Copy a file, optionally specifying a target directory."""
85 self.log.info(u"Copying {copy_from} to {copy_to}".format(
89 self.log.info(u"Copying {copy_from} to {copy_to}".format(
86 copy_from=copy_from,
90 copy_from=copy_from,
87 copy_to=copy_to or '',
91 copy_to=copy_to or '',
88 ))
92 ))
89 model = self.contents_manager.copy(copy_from, copy_to)
93 model = self.contents_manager.copy(copy_from, copy_to)
90 self.set_status(201)
94 self.set_status(201)
91 self._finish_model(model)
95 self._finish_model(model)
92
96
93 def _upload(self, model, path):
97 def _upload(self, model, path):
94 """Handle upload of a new file to path"""
98 """Handle upload of a new file to path"""
95 self.log.info(u"Uploading file to %s", path)
99 self.log.info(u"Uploading file to %s", path)
96 model = self.contents_manager.new(model, path)
100 model = self.contents_manager.new(model, path)
97 self.set_status(201)
101 self.set_status(201)
98 self._finish_model(model)
102 self._finish_model(model)
99
103
100 def _new_untitled(self, path, type='', ext=''):
104 def _new_untitled(self, path, type='', ext=''):
101 """Create a new, empty untitled entity"""
105 """Create a new, empty untitled entity"""
102 self.log.info(u"Creating new %s in %s", type or 'file', path)
106 self.log.info(u"Creating new %s in %s", type or 'file', path)
103 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
107 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
104 self.set_status(201)
108 self.set_status(201)
105 self._finish_model(model)
109 self._finish_model(model)
106
110
107 def _save(self, model, path):
111 def _save(self, model, path):
108 """Save an existing file."""
112 """Save an existing file."""
109 self.log.info(u"Saving file at %s", path)
113 self.log.info(u"Saving file at %s", path)
110 model = self.contents_manager.save(model, path)
114 model = self.contents_manager.save(model, path)
111 self._finish_model(model)
115 self._finish_model(model)
112
116
113 @web.authenticated
117 @web.authenticated
114 @json_errors
118 @json_errors
115 def post(self, path=''):
119 def post(self, path=''):
116 """Create a new file in the specified path.
120 """Create a new file in the specified path.
117
121
118 POST creates new files. The server always decides on the name.
122 POST creates new files. The server always decides on the name.
119
123
120 POST /api/contents/path
124 POST /api/contents/path
121 New untitled, empty file or directory.
125 New untitled, empty file or directory.
122 POST /api/contents/path
126 POST /api/contents/path
123 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
127 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
124 New copy of OtherNotebook in path
128 New copy of OtherNotebook in path
125 """
129 """
126
130
127 cm = self.contents_manager
131 cm = self.contents_manager
128
132
129 if cm.file_exists(path):
133 if cm.file_exists(path):
130 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
134 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
131
135
132 if not cm.dir_exists(path):
136 if not cm.dir_exists(path):
133 raise web.HTTPError(404, "No such directory: %s" % path)
137 raise web.HTTPError(404, "No such directory: %s" % path)
134
138
135 model = self.get_json_body()
139 model = self.get_json_body()
136
140
137 if model is not None:
141 if model is not None:
138 copy_from = model.get('copy_from')
142 copy_from = model.get('copy_from')
139 ext = model.get('ext', '')
143 ext = model.get('ext', '')
140 type = model.get('type', '')
144 type = model.get('type', '')
141 if copy_from:
145 if copy_from:
142 self._copy(copy_from, path)
146 self._copy(copy_from, path)
143 else:
147 else:
144 self._new_untitled(path, type=type, ext=ext)
148 self._new_untitled(path, type=type, ext=ext)
145 else:
149 else:
146 self._new_untitled(path)
150 self._new_untitled(path)
147
151
148 @web.authenticated
152 @web.authenticated
149 @json_errors
153 @json_errors
150 def put(self, path=''):
154 def put(self, path=''):
151 """Saves the file in the location specified by name and path.
155 """Saves the file in the location specified by name and path.
152
156
153 PUT is very similar to POST, but the requester specifies the name,
157 PUT is very similar to POST, but the requester specifies the name,
154 whereas with POST, the server picks the name.
158 whereas with POST, the server picks the name.
155
159
156 PUT /api/contents/path/Name.ipynb
160 PUT /api/contents/path/Name.ipynb
157 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
161 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
158 in `content` key of JSON request body. If content is not specified,
162 in `content` key of JSON request body. If content is not specified,
159 create a new empty notebook.
163 create a new empty notebook.
160 """
164 """
161 model = self.get_json_body()
165 model = self.get_json_body()
162 if model:
166 if model:
163 if model.get('copy_from'):
167 if model.get('copy_from'):
164 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
168 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
165 if self.contents_manager.file_exists(path):
169 if self.contents_manager.file_exists(path):
166 self._save(model, path)
170 self._save(model, path)
167 else:
171 else:
168 self._upload(model, path)
172 self._upload(model, path)
169 else:
173 else:
170 self._new_untitled(path)
174 self._new_untitled(path)
171
175
172 @web.authenticated
176 @web.authenticated
173 @json_errors
177 @json_errors
174 def delete(self, path=''):
178 def delete(self, path=''):
175 """delete a file in the given path"""
179 """delete a file in the given path"""
176 cm = self.contents_manager
180 cm = self.contents_manager
177 self.log.warn('delete %s', path)
181 self.log.warn('delete %s', path)
178 cm.delete(path)
182 cm.delete(path)
179 self.set_status(204)
183 self.set_status(204)
180 self.finish()
184 self.finish()
181
185
182
186
183 class CheckpointsHandler(IPythonHandler):
187 class CheckpointsHandler(IPythonHandler):
184
188
185 SUPPORTED_METHODS = ('GET', 'POST')
189 SUPPORTED_METHODS = ('GET', 'POST')
186
190
187 @web.authenticated
191 @web.authenticated
188 @json_errors
192 @json_errors
189 def get(self, path=''):
193 def get(self, path=''):
190 """get lists checkpoints for a file"""
194 """get lists checkpoints for a file"""
191 cm = self.contents_manager
195 cm = self.contents_manager
192 checkpoints = cm.list_checkpoints(path)
196 checkpoints = cm.list_checkpoints(path)
193 data = json.dumps(checkpoints, default=date_default)
197 data = json.dumps(checkpoints, default=date_default)
194 self.finish(data)
198 self.finish(data)
195
199
196 @web.authenticated
200 @web.authenticated
197 @json_errors
201 @json_errors
198 def post(self, path=''):
202 def post(self, path=''):
199 """post creates a new checkpoint"""
203 """post creates a new checkpoint"""
200 cm = self.contents_manager
204 cm = self.contents_manager
201 checkpoint = cm.create_checkpoint(path)
205 checkpoint = cm.create_checkpoint(path)
202 data = json.dumps(checkpoint, default=date_default)
206 data = json.dumps(checkpoint, default=date_default)
203 location = url_path_join(self.base_url, 'api/contents',
207 location = url_path_join(self.base_url, 'api/contents',
204 path, 'checkpoints', checkpoint['id'])
208 path, 'checkpoints', checkpoint['id'])
205 self.set_header('Location', url_escape(location))
209 self.set_header('Location', url_escape(location))
206 self.set_status(201)
210 self.set_status(201)
207 self.finish(data)
211 self.finish(data)
208
212
209
213
210 class ModifyCheckpointsHandler(IPythonHandler):
214 class ModifyCheckpointsHandler(IPythonHandler):
211
215
212 SUPPORTED_METHODS = ('POST', 'DELETE')
216 SUPPORTED_METHODS = ('POST', 'DELETE')
213
217
214 @web.authenticated
218 @web.authenticated
215 @json_errors
219 @json_errors
216 def post(self, path, checkpoint_id):
220 def post(self, path, checkpoint_id):
217 """post restores a file from a checkpoint"""
221 """post restores a file from a checkpoint"""
218 cm = self.contents_manager
222 cm = self.contents_manager
219 cm.restore_checkpoint(checkpoint_id, path)
223 cm.restore_checkpoint(checkpoint_id, path)
220 self.set_status(204)
224 self.set_status(204)
221 self.finish()
225 self.finish()
222
226
223 @web.authenticated
227 @web.authenticated
224 @json_errors
228 @json_errors
225 def delete(self, path, checkpoint_id):
229 def delete(self, path, checkpoint_id):
226 """delete clears a checkpoint for a given file"""
230 """delete clears a checkpoint for a given file"""
227 cm = self.contents_manager
231 cm = self.contents_manager
228 cm.delete_checkpoint(checkpoint_id, path)
232 cm.delete_checkpoint(checkpoint_id, path)
229 self.set_status(204)
233 self.set_status(204)
230 self.finish()
234 self.finish()
231
235
232
236
233 class NotebooksRedirectHandler(IPythonHandler):
237 class NotebooksRedirectHandler(IPythonHandler):
234 """Redirect /api/notebooks to /api/contents"""
238 """Redirect /api/notebooks to /api/contents"""
235 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
239 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
236
240
237 def get(self, path):
241 def get(self, path):
238 self.log.warn("/api/notebooks is deprecated, use /api/contents")
242 self.log.warn("/api/notebooks is deprecated, use /api/contents")
239 self.redirect(url_path_join(
243 self.redirect(url_path_join(
240 self.base_url,
244 self.base_url,
241 'api/contents',
245 'api/contents',
242 path
246 path
243 ))
247 ))
244
248
245 put = patch = post = delete = get
249 put = patch = post = delete = get
246
250
247
251
248 #-----------------------------------------------------------------------------
252 #-----------------------------------------------------------------------------
249 # URL to handler mappings
253 # URL to handler mappings
250 #-----------------------------------------------------------------------------
254 #-----------------------------------------------------------------------------
251
255
252
256
253 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
257 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
254
258
255 default_handlers = [
259 default_handlers = [
256 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
260 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
257 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
261 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
258 ModifyCheckpointsHandler),
262 ModifyCheckpointsHandler),
259 (r"/api/contents%s" % path_regex, ContentsHandler),
263 (r"/api/contents%s" % path_regex, ContentsHandler),
260 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
264 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
261 ]
265 ]
@@ -1,373 +1,373 b''
1 """A base class for contents managers."""
1 """A base class for contents managers."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from fnmatch import fnmatch
6 from fnmatch import fnmatch
7 import itertools
7 import itertools
8 import json
8 import json
9 import os
9 import os
10
10
11 from tornado.web import HTTPError
11 from tornado.web import HTTPError
12
12
13 from IPython.config.configurable import LoggingConfigurable
13 from IPython.config.configurable import LoggingConfigurable
14 from IPython.nbformat import sign, validate, ValidationError
14 from IPython.nbformat import sign, validate, ValidationError
15 from IPython.nbformat.v4 import new_notebook
15 from IPython.nbformat.v4 import new_notebook
16 from IPython.utils.traitlets import Instance, Unicode, List
16 from IPython.utils.traitlets import Instance, Unicode, List
17
17
18
18
19 class ContentsManager(LoggingConfigurable):
19 class ContentsManager(LoggingConfigurable):
20 """Base class for serving files and directories.
20 """Base class for serving files and directories.
21
21
22 This serves any text or binary file,
22 This serves any text or binary file,
23 as well as directories,
23 as well as directories,
24 with special handling for JSON notebook documents.
24 with special handling for JSON notebook documents.
25
25
26 Most APIs take a path argument,
26 Most APIs take a path argument,
27 which is always an API-style unicode path,
27 which is always an API-style unicode path,
28 and always refers to a directory.
28 and always refers to a directory.
29
29
30 - unicode, not url-escaped
30 - unicode, not url-escaped
31 - '/'-separated
31 - '/'-separated
32 - leading and trailing '/' will be stripped
32 - leading and trailing '/' will be stripped
33 - if unspecified, path defaults to '',
33 - if unspecified, path defaults to '',
34 indicating the root path.
34 indicating the root path.
35
35
36 """
36 """
37
37
38 notary = Instance(sign.NotebookNotary)
38 notary = Instance(sign.NotebookNotary)
39 def _notary_default(self):
39 def _notary_default(self):
40 return sign.NotebookNotary(parent=self)
40 return sign.NotebookNotary(parent=self)
41
41
42 hide_globs = List(Unicode, [
42 hide_globs = List(Unicode, [
43 u'__pycache__', '*.pyc', '*.pyo',
43 u'__pycache__', '*.pyc', '*.pyo',
44 '.DS_Store', '*.so', '*.dylib', '*~',
44 '.DS_Store', '*.so', '*.dylib', '*~',
45 ], config=True, help="""
45 ], config=True, help="""
46 Glob patterns to hide in file and directory listings.
46 Glob patterns to hide in file and directory listings.
47 """)
47 """)
48
48
49 untitled_notebook = Unicode("Untitled", config=True,
49 untitled_notebook = Unicode("Untitled", config=True,
50 help="The base name used when creating untitled notebooks."
50 help="The base name used when creating untitled notebooks."
51 )
51 )
52
52
53 untitled_file = Unicode("untitled", config=True,
53 untitled_file = Unicode("untitled", config=True,
54 help="The base name used when creating untitled files."
54 help="The base name used when creating untitled files."
55 )
55 )
56
56
57 untitled_directory = Unicode("Untitled Folder", config=True,
57 untitled_directory = Unicode("Untitled Folder", config=True,
58 help="The base name used when creating untitled directories."
58 help="The base name used when creating untitled directories."
59 )
59 )
60
60
61 # ContentsManager API part 1: methods that must be
61 # ContentsManager API part 1: methods that must be
62 # implemented in subclasses.
62 # implemented in subclasses.
63
63
64 def dir_exists(self, path):
64 def dir_exists(self, path):
65 """Does the API-style path (directory) actually exist?
65 """Does the API-style path (directory) actually exist?
66
66
67 Like os.path.isdir
67 Like os.path.isdir
68
68
69 Override this method in subclasses.
69 Override this method in subclasses.
70
70
71 Parameters
71 Parameters
72 ----------
72 ----------
73 path : string
73 path : string
74 The path to check
74 The path to check
75
75
76 Returns
76 Returns
77 -------
77 -------
78 exists : bool
78 exists : bool
79 Whether the path does indeed exist.
79 Whether the path does indeed exist.
80 """
80 """
81 raise NotImplementedError
81 raise NotImplementedError
82
82
83 def is_hidden(self, path):
83 def is_hidden(self, path):
84 """Does the API style path correspond to a hidden directory or file?
84 """Does the API style path correspond to a hidden directory or file?
85
85
86 Parameters
86 Parameters
87 ----------
87 ----------
88 path : string
88 path : string
89 The path to check. This is an API path (`/` separated,
89 The path to check. This is an API path (`/` separated,
90 relative to root dir).
90 relative to root dir).
91
91
92 Returns
92 Returns
93 -------
93 -------
94 hidden : bool
94 hidden : bool
95 Whether the path is hidden.
95 Whether the path is hidden.
96
96
97 """
97 """
98 raise NotImplementedError
98 raise NotImplementedError
99
99
100 def file_exists(self, path=''):
100 def file_exists(self, path=''):
101 """Does a file exist at the given path?
101 """Does a file exist at the given path?
102
102
103 Like os.path.isfile
103 Like os.path.isfile
104
104
105 Override this method in subclasses.
105 Override this method in subclasses.
106
106
107 Parameters
107 Parameters
108 ----------
108 ----------
109 name : string
109 name : string
110 The name of the file you are checking.
110 The name of the file you are checking.
111 path : string
111 path : string
112 The relative path to the file's directory (with '/' as separator)
112 The relative path to the file's directory (with '/' as separator)
113
113
114 Returns
114 Returns
115 -------
115 -------
116 exists : bool
116 exists : bool
117 Whether the file exists.
117 Whether the file exists.
118 """
118 """
119 raise NotImplementedError('must be implemented in a subclass')
119 raise NotImplementedError('must be implemented in a subclass')
120
120
121 def exists(self, path):
121 def exists(self, path):
122 """Does a file or directory exist at the given path?
122 """Does a file or directory exist at the given path?
123
123
124 Like os.path.exists
124 Like os.path.exists
125
125
126 Parameters
126 Parameters
127 ----------
127 ----------
128 path : string
128 path : string
129 The relative path to the file's directory (with '/' as separator)
129 The relative path to the file's directory (with '/' as separator)
130
130
131 Returns
131 Returns
132 -------
132 -------
133 exists : bool
133 exists : bool
134 Whether the target exists.
134 Whether the target exists.
135 """
135 """
136 return self.file_exists(path) or self.dir_exists(path)
136 return self.file_exists(path) or self.dir_exists(path)
137
137
138 def get_model(self, path, content=True, type_=None):
138 def get_model(self, path, content=True, type_=None, format=None):
139 """Get the model of a file or directory with or without content."""
139 """Get the model of a file or directory with or without content."""
140 raise NotImplementedError('must be implemented in a subclass')
140 raise NotImplementedError('must be implemented in a subclass')
141
141
142 def save(self, model, path):
142 def save(self, model, path):
143 """Save the file or directory and return the model with no content."""
143 """Save the file or directory and return the model with no content."""
144 raise NotImplementedError('must be implemented in a subclass')
144 raise NotImplementedError('must be implemented in a subclass')
145
145
146 def update(self, model, path):
146 def update(self, model, path):
147 """Update the file or directory and return the model with no content.
147 """Update the file or directory and return the model with no content.
148
148
149 For use in PATCH requests, to enable renaming a file without
149 For use in PATCH requests, to enable renaming a file without
150 re-uploading its contents. Only used for renaming at the moment.
150 re-uploading its contents. Only used for renaming at the moment.
151 """
151 """
152 raise NotImplementedError('must be implemented in a subclass')
152 raise NotImplementedError('must be implemented in a subclass')
153
153
154 def delete(self, path):
154 def delete(self, path):
155 """Delete file or directory by path."""
155 """Delete file or directory by path."""
156 raise NotImplementedError('must be implemented in a subclass')
156 raise NotImplementedError('must be implemented in a subclass')
157
157
158 def create_checkpoint(self, path):
158 def create_checkpoint(self, path):
159 """Create a checkpoint of the current state of a file
159 """Create a checkpoint of the current state of a file
160
160
161 Returns a checkpoint_id for the new checkpoint.
161 Returns a checkpoint_id for the new checkpoint.
162 """
162 """
163 raise NotImplementedError("must be implemented in a subclass")
163 raise NotImplementedError("must be implemented in a subclass")
164
164
165 def list_checkpoints(self, path):
165 def list_checkpoints(self, path):
166 """Return a list of checkpoints for a given file"""
166 """Return a list of checkpoints for a given file"""
167 return []
167 return []
168
168
169 def restore_checkpoint(self, checkpoint_id, path):
169 def restore_checkpoint(self, checkpoint_id, path):
170 """Restore a file from one of its checkpoints"""
170 """Restore a file from one of its checkpoints"""
171 raise NotImplementedError("must be implemented in a subclass")
171 raise NotImplementedError("must be implemented in a subclass")
172
172
173 def delete_checkpoint(self, checkpoint_id, path):
173 def delete_checkpoint(self, checkpoint_id, path):
174 """delete a checkpoint for a file"""
174 """delete a checkpoint for a file"""
175 raise NotImplementedError("must be implemented in a subclass")
175 raise NotImplementedError("must be implemented in a subclass")
176
176
177 # ContentsManager API part 2: methods that have useable default
177 # ContentsManager API part 2: methods that have useable default
178 # implementations, but can be overridden in subclasses.
178 # implementations, but can be overridden in subclasses.
179
179
180 def info_string(self):
180 def info_string(self):
181 return "Serving contents"
181 return "Serving contents"
182
182
183 def get_kernel_path(self, path, model=None):
183 def get_kernel_path(self, path, model=None):
184 """Return the API path for the kernel
184 """Return the API path for the kernel
185
185
186 KernelManagers can turn this value into a filesystem path,
186 KernelManagers can turn this value into a filesystem path,
187 or ignore it altogether.
187 or ignore it altogether.
188 """
188 """
189 return path
189 return path
190
190
191 def increment_filename(self, filename, path=''):
191 def increment_filename(self, filename, path=''):
192 """Increment a filename until it is unique.
192 """Increment a filename until it is unique.
193
193
194 Parameters
194 Parameters
195 ----------
195 ----------
196 filename : unicode
196 filename : unicode
197 The name of a file, including extension
197 The name of a file, including extension
198 path : unicode
198 path : unicode
199 The API path of the target's directory
199 The API path of the target's directory
200
200
201 Returns
201 Returns
202 -------
202 -------
203 name : unicode
203 name : unicode
204 A filename that is unique, based on the input filename.
204 A filename that is unique, based on the input filename.
205 """
205 """
206 path = path.strip('/')
206 path = path.strip('/')
207 basename, ext = os.path.splitext(filename)
207 basename, ext = os.path.splitext(filename)
208 for i in itertools.count():
208 for i in itertools.count():
209 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
209 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
210 ext=ext)
210 ext=ext)
211 if not self.exists(u'{}/{}'.format(path, name)):
211 if not self.exists(u'{}/{}'.format(path, name)):
212 break
212 break
213 return name
213 return name
214
214
215 def validate_notebook_model(self, model):
215 def validate_notebook_model(self, model):
216 """Add failed-validation message to model"""
216 """Add failed-validation message to model"""
217 try:
217 try:
218 validate(model['content'])
218 validate(model['content'])
219 except ValidationError as e:
219 except ValidationError as e:
220 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
220 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
221 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
221 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
222 )
222 )
223 return model
223 return model
224
224
225 def new_untitled(self, path='', type='', ext=''):
225 def new_untitled(self, path='', type='', ext=''):
226 """Create a new untitled file or directory in path
226 """Create a new untitled file or directory in path
227
227
228 path must be a directory
228 path must be a directory
229
229
230 File extension can be specified.
230 File extension can be specified.
231
231
232 Use `new` to create files with a fully specified path (including filename).
232 Use `new` to create files with a fully specified path (including filename).
233 """
233 """
234 path = path.strip('/')
234 path = path.strip('/')
235 if not self.dir_exists(path):
235 if not self.dir_exists(path):
236 raise HTTPError(404, 'No such directory: %s' % path)
236 raise HTTPError(404, 'No such directory: %s' % path)
237
237
238 model = {}
238 model = {}
239 if type:
239 if type:
240 model['type'] = type
240 model['type'] = type
241
241
242 if ext == '.ipynb':
242 if ext == '.ipynb':
243 model.setdefault('type', 'notebook')
243 model.setdefault('type', 'notebook')
244 else:
244 else:
245 model.setdefault('type', 'file')
245 model.setdefault('type', 'file')
246
246
247 if model['type'] == 'directory':
247 if model['type'] == 'directory':
248 untitled = self.untitled_directory
248 untitled = self.untitled_directory
249 elif model['type'] == 'notebook':
249 elif model['type'] == 'notebook':
250 untitled = self.untitled_notebook
250 untitled = self.untitled_notebook
251 ext = '.ipynb'
251 ext = '.ipynb'
252 elif model['type'] == 'file':
252 elif model['type'] == 'file':
253 untitled = self.untitled_file
253 untitled = self.untitled_file
254 else:
254 else:
255 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
255 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
256
256
257 name = self.increment_filename(untitled + ext, path)
257 name = self.increment_filename(untitled + ext, path)
258 path = u'{0}/{1}'.format(path, name)
258 path = u'{0}/{1}'.format(path, name)
259 return self.new(model, path)
259 return self.new(model, path)
260
260
261 def new(self, model=None, path=''):
261 def new(self, model=None, path=''):
262 """Create a new file or directory and return its model with no content.
262 """Create a new file or directory and return its model with no content.
263
263
264 To create a new untitled entity in a directory, use `new_untitled`.
264 To create a new untitled entity in a directory, use `new_untitled`.
265 """
265 """
266 path = path.strip('/')
266 path = path.strip('/')
267 if model is None:
267 if model is None:
268 model = {}
268 model = {}
269
269
270 if path.endswith('.ipynb'):
270 if path.endswith('.ipynb'):
271 model.setdefault('type', 'notebook')
271 model.setdefault('type', 'notebook')
272 else:
272 else:
273 model.setdefault('type', 'file')
273 model.setdefault('type', 'file')
274
274
275 # no content, not a directory, so fill out new-file model
275 # no content, not a directory, so fill out new-file model
276 if 'content' not in model and model['type'] != 'directory':
276 if 'content' not in model and model['type'] != 'directory':
277 if model['type'] == 'notebook':
277 if model['type'] == 'notebook':
278 model['content'] = new_notebook()
278 model['content'] = new_notebook()
279 model['format'] = 'json'
279 model['format'] = 'json'
280 else:
280 else:
281 model['content'] = ''
281 model['content'] = ''
282 model['type'] = 'file'
282 model['type'] = 'file'
283 model['format'] = 'text'
283 model['format'] = 'text'
284
284
285 model = self.save(model, path)
285 model = self.save(model, path)
286 return model
286 return model
287
287
288 def copy(self, from_path, to_path=None):
288 def copy(self, from_path, to_path=None):
289 """Copy an existing file and return its new model.
289 """Copy an existing file and return its new model.
290
290
291 If to_path not specified, it will be the parent directory of from_path.
291 If to_path not specified, it will be the parent directory of from_path.
292 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
292 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
293
293
294 from_path must be a full path to a file.
294 from_path must be a full path to a file.
295 """
295 """
296 path = from_path.strip('/')
296 path = from_path.strip('/')
297 if '/' in path:
297 if '/' in path:
298 from_dir, from_name = path.rsplit('/', 1)
298 from_dir, from_name = path.rsplit('/', 1)
299 else:
299 else:
300 from_dir = ''
300 from_dir = ''
301 from_name = path
301 from_name = path
302
302
303 model = self.get_model(path)
303 model = self.get_model(path)
304 model.pop('path', None)
304 model.pop('path', None)
305 model.pop('name', None)
305 model.pop('name', None)
306 if model['type'] == 'directory':
306 if model['type'] == 'directory':
307 raise HTTPError(400, "Can't copy directories")
307 raise HTTPError(400, "Can't copy directories")
308
308
309 if not to_path:
309 if not to_path:
310 to_path = from_dir
310 to_path = from_dir
311 if self.dir_exists(to_path):
311 if self.dir_exists(to_path):
312 base, ext = os.path.splitext(from_name)
312 base, ext = os.path.splitext(from_name)
313 copy_name = u'{0}-Copy{1}'.format(base, ext)
313 copy_name = u'{0}-Copy{1}'.format(base, ext)
314 to_name = self.increment_filename(copy_name, to_path)
314 to_name = self.increment_filename(copy_name, to_path)
315 to_path = u'{0}/{1}'.format(to_path, to_name)
315 to_path = u'{0}/{1}'.format(to_path, to_name)
316
316
317 model = self.save(model, to_path)
317 model = self.save(model, to_path)
318 return model
318 return model
319
319
320 def log_info(self):
320 def log_info(self):
321 self.log.info(self.info_string())
321 self.log.info(self.info_string())
322
322
323 def trust_notebook(self, path):
323 def trust_notebook(self, path):
324 """Explicitly trust a notebook
324 """Explicitly trust a notebook
325
325
326 Parameters
326 Parameters
327 ----------
327 ----------
328 path : string
328 path : string
329 The path of a notebook
329 The path of a notebook
330 """
330 """
331 model = self.get_model(path)
331 model = self.get_model(path)
332 nb = model['content']
332 nb = model['content']
333 self.log.warn("Trusting notebook %s", path)
333 self.log.warn("Trusting notebook %s", path)
334 self.notary.mark_cells(nb, True)
334 self.notary.mark_cells(nb, True)
335 self.save(model, path)
335 self.save(model, path)
336
336
337 def check_and_sign(self, nb, path=''):
337 def check_and_sign(self, nb, path=''):
338 """Check for trusted cells, and sign the notebook.
338 """Check for trusted cells, and sign the notebook.
339
339
340 Called as a part of saving notebooks.
340 Called as a part of saving notebooks.
341
341
342 Parameters
342 Parameters
343 ----------
343 ----------
344 nb : dict
344 nb : dict
345 The notebook dict
345 The notebook dict
346 path : string
346 path : string
347 The notebook's path (for logging)
347 The notebook's path (for logging)
348 """
348 """
349 if self.notary.check_cells(nb):
349 if self.notary.check_cells(nb):
350 self.notary.sign(nb)
350 self.notary.sign(nb)
351 else:
351 else:
352 self.log.warn("Saving untrusted notebook %s", path)
352 self.log.warn("Saving untrusted notebook %s", path)
353
353
354 def mark_trusted_cells(self, nb, path=''):
354 def mark_trusted_cells(self, nb, path=''):
355 """Mark cells as trusted if the notebook signature matches.
355 """Mark cells as trusted if the notebook signature matches.
356
356
357 Called as a part of loading notebooks.
357 Called as a part of loading notebooks.
358
358
359 Parameters
359 Parameters
360 ----------
360 ----------
361 nb : dict
361 nb : dict
362 The notebook object (in current nbformat)
362 The notebook object (in current nbformat)
363 path : string
363 path : string
364 The notebook's path (for logging)
364 The notebook's path (for logging)
365 """
365 """
366 trusted = self.notary.check_signature(nb)
366 trusted = self.notary.check_signature(nb)
367 if not trusted:
367 if not trusted:
368 self.log.warn("Notebook %s is not trusted", path)
368 self.log.warn("Notebook %s is not trusted", path)
369 self.notary.mark_cells(nb, trusted)
369 self.notary.mark_cells(nb, trusted)
370
370
371 def should_list(self, name):
371 def should_list(self, name):
372 """Should this file/directory name be displayed in a listing?"""
372 """Should this file/directory name be displayed in a listing?"""
373 return not any(fnmatch(name, glob) for glob in self.hide_globs)
373 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,504 +1,513 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 import io
5 import io
6 import json
6 import json
7 import os
7 import os
8 import shutil
8 import shutil
9 from unicodedata import normalize
9 from unicodedata import normalize
10
10
11 pjoin = os.path.join
11 pjoin = os.path.join
12
12
13 import requests
13 import requests
14
14
15 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.utils import url_path_join, url_escape
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import read, write, from_dict
17 from IPython.nbformat import read, write, from_dict
18 from IPython.nbformat.v4 import (
18 from IPython.nbformat.v4 import (
19 new_notebook, new_markdown_cell,
19 new_notebook, new_markdown_cell,
20 )
20 )
21 from IPython.nbformat import v2
21 from IPython.nbformat import v2
22 from IPython.utils import py3compat
22 from IPython.utils import py3compat
23 from IPython.utils.data import uniq_stable
23 from IPython.utils.data import uniq_stable
24
24
25
25
26 def notebooks_only(dir_model):
26 def notebooks_only(dir_model):
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28
28
29 def dirs_only(dir_model):
29 def dirs_only(dir_model):
30 return [x for x in dir_model['content'] if x['type']=='directory']
30 return [x for x in dir_model['content'] if x['type']=='directory']
31
31
32
32
33 class API(object):
33 class API(object):
34 """Wrapper for contents API calls."""
34 """Wrapper for contents API calls."""
35 def __init__(self, base_url):
35 def __init__(self, base_url):
36 self.base_url = base_url
36 self.base_url = base_url
37
37
38 def _req(self, verb, path, body=None):
38 def _req(self, verb, path, body=None):
39 response = requests.request(verb,
39 response = requests.request(verb,
40 url_path_join(self.base_url, 'api/contents', path),
40 url_path_join(self.base_url, 'api/contents', path),
41 data=body,
41 data=body,
42 )
42 )
43 response.raise_for_status()
43 response.raise_for_status()
44 return response
44 return response
45
45
46 def list(self, path='/'):
46 def list(self, path='/'):
47 return self._req('GET', path)
47 return self._req('GET', path)
48
48
49 def read(self, path, type_=None):
49 def read(self, path, type_=None, format=None):
50 query = []
50 if type_ is not None:
51 if type_ is not None:
51 path += '?type=' + type_
52 query.append('type=' + type_)
53 if format is not None:
54 query.append('format=' + format)
55 if query:
56 path += '?' + '&'.join(query)
52 return self._req('GET', path)
57 return self._req('GET', path)
53
58
54 def create_untitled(self, path='/', ext='.ipynb'):
59 def create_untitled(self, path='/', ext='.ipynb'):
55 body = None
60 body = None
56 if ext:
61 if ext:
57 body = json.dumps({'ext': ext})
62 body = json.dumps({'ext': ext})
58 return self._req('POST', path, body)
63 return self._req('POST', path, body)
59
64
60 def mkdir_untitled(self, path='/'):
65 def mkdir_untitled(self, path='/'):
61 return self._req('POST', path, json.dumps({'type': 'directory'}))
66 return self._req('POST', path, json.dumps({'type': 'directory'}))
62
67
63 def copy(self, copy_from, path='/'):
68 def copy(self, copy_from, path='/'):
64 body = json.dumps({'copy_from':copy_from})
69 body = json.dumps({'copy_from':copy_from})
65 return self._req('POST', path, body)
70 return self._req('POST', path, body)
66
71
67 def create(self, path='/'):
72 def create(self, path='/'):
68 return self._req('PUT', path)
73 return self._req('PUT', path)
69
74
70 def upload(self, path, body):
75 def upload(self, path, body):
71 return self._req('PUT', path, body)
76 return self._req('PUT', path, body)
72
77
73 def mkdir_untitled(self, path='/'):
78 def mkdir_untitled(self, path='/'):
74 return self._req('POST', path, json.dumps({'type': 'directory'}))
79 return self._req('POST', path, json.dumps({'type': 'directory'}))
75
80
76 def mkdir(self, path='/'):
81 def mkdir(self, path='/'):
77 return self._req('PUT', path, json.dumps({'type': 'directory'}))
82 return self._req('PUT', path, json.dumps({'type': 'directory'}))
78
83
79 def copy_put(self, copy_from, path='/'):
84 def copy_put(self, copy_from, path='/'):
80 body = json.dumps({'copy_from':copy_from})
85 body = json.dumps({'copy_from':copy_from})
81 return self._req('PUT', path, body)
86 return self._req('PUT', path, body)
82
87
83 def save(self, path, body):
88 def save(self, path, body):
84 return self._req('PUT', path, body)
89 return self._req('PUT', path, body)
85
90
86 def delete(self, path='/'):
91 def delete(self, path='/'):
87 return self._req('DELETE', path)
92 return self._req('DELETE', path)
88
93
89 def rename(self, path, new_path):
94 def rename(self, path, new_path):
90 body = json.dumps({'path': new_path})
95 body = json.dumps({'path': new_path})
91 return self._req('PATCH', path, body)
96 return self._req('PATCH', path, body)
92
97
93 def get_checkpoints(self, path):
98 def get_checkpoints(self, path):
94 return self._req('GET', url_path_join(path, 'checkpoints'))
99 return self._req('GET', url_path_join(path, 'checkpoints'))
95
100
96 def new_checkpoint(self, path):
101 def new_checkpoint(self, path):
97 return self._req('POST', url_path_join(path, 'checkpoints'))
102 return self._req('POST', url_path_join(path, 'checkpoints'))
98
103
99 def restore_checkpoint(self, path, checkpoint_id):
104 def restore_checkpoint(self, path, checkpoint_id):
100 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
105 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
101
106
102 def delete_checkpoint(self, path, checkpoint_id):
107 def delete_checkpoint(self, path, checkpoint_id):
103 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
108 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
104
109
105 class APITest(NotebookTestBase):
110 class APITest(NotebookTestBase):
106 """Test the kernels web service API"""
111 """Test the kernels web service API"""
107 dirs_nbs = [('', 'inroot'),
112 dirs_nbs = [('', 'inroot'),
108 ('Directory with spaces in', 'inspace'),
113 ('Directory with spaces in', 'inspace'),
109 (u'unicodΓ©', 'innonascii'),
114 (u'unicodΓ©', 'innonascii'),
110 ('foo', 'a'),
115 ('foo', 'a'),
111 ('foo', 'b'),
116 ('foo', 'b'),
112 ('foo', 'name with spaces'),
117 ('foo', 'name with spaces'),
113 ('foo', u'unicodΓ©'),
118 ('foo', u'unicodΓ©'),
114 ('foo/bar', 'baz'),
119 ('foo/bar', 'baz'),
115 ('ordering', 'A'),
120 ('ordering', 'A'),
116 ('ordering', 'b'),
121 ('ordering', 'b'),
117 ('ordering', 'C'),
122 ('ordering', 'C'),
118 (u'Γ₯ b', u'Γ§ d'),
123 (u'Γ₯ b', u'Γ§ d'),
119 ]
124 ]
120 hidden_dirs = ['.hidden', '__pycache__']
125 hidden_dirs = ['.hidden', '__pycache__']
121
126
122 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
127 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
123 del dirs[0] # remove ''
128 del dirs[0] # remove ''
124 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
129 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
125
130
126 @staticmethod
131 @staticmethod
127 def _blob_for_name(name):
132 def _blob_for_name(name):
128 return name.encode('utf-8') + b'\xFF'
133 return name.encode('utf-8') + b'\xFF'
129
134
130 @staticmethod
135 @staticmethod
131 def _txt_for_name(name):
136 def _txt_for_name(name):
132 return u'%s text file' % name
137 return u'%s text file' % name
133
138
134 def setUp(self):
139 def setUp(self):
135 nbdir = self.notebook_dir.name
140 nbdir = self.notebook_dir.name
136 self.blob = os.urandom(100)
141 self.blob = os.urandom(100)
137 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
142 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
138
143
139 for d in (self.dirs + self.hidden_dirs):
144 for d in (self.dirs + self.hidden_dirs):
140 d.replace('/', os.sep)
145 d.replace('/', os.sep)
141 if not os.path.isdir(pjoin(nbdir, d)):
146 if not os.path.isdir(pjoin(nbdir, d)):
142 os.mkdir(pjoin(nbdir, d))
147 os.mkdir(pjoin(nbdir, d))
143
148
144 for d, name in self.dirs_nbs:
149 for d, name in self.dirs_nbs:
145 d = d.replace('/', os.sep)
150 d = d.replace('/', os.sep)
146 # create a notebook
151 # create a notebook
147 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
152 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
148 encoding='utf-8') as f:
153 encoding='utf-8') as f:
149 nb = new_notebook()
154 nb = new_notebook()
150 write(nb, f, version=4)
155 write(nb, f, version=4)
151
156
152 # create a text file
157 # create a text file
153 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
158 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
154 encoding='utf-8') as f:
159 encoding='utf-8') as f:
155 f.write(self._txt_for_name(name))
160 f.write(self._txt_for_name(name))
156
161
157 # create a binary file
162 # create a binary file
158 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
163 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
159 f.write(self._blob_for_name(name))
164 f.write(self._blob_for_name(name))
160
165
161 self.api = API(self.base_url())
166 self.api = API(self.base_url())
162
167
163 def tearDown(self):
168 def tearDown(self):
164 nbdir = self.notebook_dir.name
169 nbdir = self.notebook_dir.name
165
170
166 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
171 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
167 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
172 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
168
173
169 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
174 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
170 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
175 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
171
176
172 def test_list_notebooks(self):
177 def test_list_notebooks(self):
173 nbs = notebooks_only(self.api.list().json())
178 nbs = notebooks_only(self.api.list().json())
174 self.assertEqual(len(nbs), 1)
179 self.assertEqual(len(nbs), 1)
175 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
180 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
176
181
177 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
182 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
178 self.assertEqual(len(nbs), 1)
183 self.assertEqual(len(nbs), 1)
179 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
184 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
180
185
181 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
186 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
182 self.assertEqual(len(nbs), 1)
187 self.assertEqual(len(nbs), 1)
183 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
188 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
184 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
189 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
185
190
186 nbs = notebooks_only(self.api.list('/foo/bar/').json())
191 nbs = notebooks_only(self.api.list('/foo/bar/').json())
187 self.assertEqual(len(nbs), 1)
192 self.assertEqual(len(nbs), 1)
188 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
193 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
189 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
194 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
190
195
191 nbs = notebooks_only(self.api.list('foo').json())
196 nbs = notebooks_only(self.api.list('foo').json())
192 self.assertEqual(len(nbs), 4)
197 self.assertEqual(len(nbs), 4)
193 nbnames = { normalize('NFC', n['name']) for n in nbs }
198 nbnames = { normalize('NFC', n['name']) for n in nbs }
194 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
199 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
195 expected = { normalize('NFC', name) for name in expected }
200 expected = { normalize('NFC', name) for name in expected }
196 self.assertEqual(nbnames, expected)
201 self.assertEqual(nbnames, expected)
197
202
198 nbs = notebooks_only(self.api.list('ordering').json())
203 nbs = notebooks_only(self.api.list('ordering').json())
199 nbnames = [n['name'] for n in nbs]
204 nbnames = [n['name'] for n in nbs]
200 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
205 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
201 self.assertEqual(nbnames, expected)
206 self.assertEqual(nbnames, expected)
202
207
203 def test_list_dirs(self):
208 def test_list_dirs(self):
204 print(self.api.list().json())
209 print(self.api.list().json())
205 dirs = dirs_only(self.api.list().json())
210 dirs = dirs_only(self.api.list().json())
206 dir_names = {normalize('NFC', d['name']) for d in dirs}
211 dir_names = {normalize('NFC', d['name']) for d in dirs}
207 print(dir_names)
212 print(dir_names)
208 print(self.top_level_dirs)
213 print(self.top_level_dirs)
209 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
214 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
210
215
211 def test_list_nonexistant_dir(self):
216 def test_list_nonexistant_dir(self):
212 with assert_http_error(404):
217 with assert_http_error(404):
213 self.api.list('nonexistant')
218 self.api.list('nonexistant')
214
219
215 def test_get_nb_contents(self):
220 def test_get_nb_contents(self):
216 for d, name in self.dirs_nbs:
221 for d, name in self.dirs_nbs:
217 path = url_path_join(d, name + '.ipynb')
222 path = url_path_join(d, name + '.ipynb')
218 nb = self.api.read(path).json()
223 nb = self.api.read(path).json()
219 self.assertEqual(nb['name'], u'%s.ipynb' % name)
224 self.assertEqual(nb['name'], u'%s.ipynb' % name)
220 self.assertEqual(nb['path'], path)
225 self.assertEqual(nb['path'], path)
221 self.assertEqual(nb['type'], 'notebook')
226 self.assertEqual(nb['type'], 'notebook')
222 self.assertIn('content', nb)
227 self.assertIn('content', nb)
223 self.assertEqual(nb['format'], 'json')
228 self.assertEqual(nb['format'], 'json')
224 self.assertIn('content', nb)
229 self.assertIn('content', nb)
225 self.assertIn('metadata', nb['content'])
230 self.assertIn('metadata', nb['content'])
226 self.assertIsInstance(nb['content']['metadata'], dict)
231 self.assertIsInstance(nb['content']['metadata'], dict)
227
232
228 def test_get_contents_no_such_file(self):
233 def test_get_contents_no_such_file(self):
229 # Name that doesn't exist - should be a 404
234 # Name that doesn't exist - should be a 404
230 with assert_http_error(404):
235 with assert_http_error(404):
231 self.api.read('foo/q.ipynb')
236 self.api.read('foo/q.ipynb')
232
237
233 def test_get_text_file_contents(self):
238 def test_get_text_file_contents(self):
234 for d, name in self.dirs_nbs:
239 for d, name in self.dirs_nbs:
235 path = url_path_join(d, name + '.txt')
240 path = url_path_join(d, name + '.txt')
236 model = self.api.read(path).json()
241 model = self.api.read(path).json()
237 self.assertEqual(model['name'], u'%s.txt' % name)
242 self.assertEqual(model['name'], u'%s.txt' % name)
238 self.assertEqual(model['path'], path)
243 self.assertEqual(model['path'], path)
239 self.assertIn('content', model)
244 self.assertIn('content', model)
240 self.assertEqual(model['format'], 'text')
245 self.assertEqual(model['format'], 'text')
241 self.assertEqual(model['type'], 'file')
246 self.assertEqual(model['type'], 'file')
242 self.assertEqual(model['content'], self._txt_for_name(name))
247 self.assertEqual(model['content'], self._txt_for_name(name))
243
248
244 # Name that doesn't exist - should be a 404
249 # Name that doesn't exist - should be a 404
245 with assert_http_error(404):
250 with assert_http_error(404):
246 self.api.read('foo/q.txt')
251 self.api.read('foo/q.txt')
247
252
253 # Specifying format=text should fail on a non-UTF-8 file
254 with assert_http_error(400):
255 self.api.read('foo/bar/baz.blob', type_='file', format='text')
256
248 def test_get_binary_file_contents(self):
257 def test_get_binary_file_contents(self):
249 for d, name in self.dirs_nbs:
258 for d, name in self.dirs_nbs:
250 path = url_path_join(d, name + '.blob')
259 path = url_path_join(d, name + '.blob')
251 model = self.api.read(path).json()
260 model = self.api.read(path).json()
252 self.assertEqual(model['name'], u'%s.blob' % name)
261 self.assertEqual(model['name'], u'%s.blob' % name)
253 self.assertEqual(model['path'], path)
262 self.assertEqual(model['path'], path)
254 self.assertIn('content', model)
263 self.assertIn('content', model)
255 self.assertEqual(model['format'], 'base64')
264 self.assertEqual(model['format'], 'base64')
256 self.assertEqual(model['type'], 'file')
265 self.assertEqual(model['type'], 'file')
257 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
266 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
258 self.assertEqual(model['content'], b64_data)
267 self.assertEqual(model['content'], b64_data)
259
268
260 # Name that doesn't exist - should be a 404
269 # Name that doesn't exist - should be a 404
261 with assert_http_error(404):
270 with assert_http_error(404):
262 self.api.read('foo/q.txt')
271 self.api.read('foo/q.txt')
263
272
264 def test_get_bad_type(self):
273 def test_get_bad_type(self):
265 with assert_http_error(400):
274 with assert_http_error(400):
266 self.api.read(u'unicodΓ©', type_='file') # this is a directory
275 self.api.read(u'unicodΓ©', type_='file') # this is a directory
267
276
268 with assert_http_error(400):
277 with assert_http_error(400):
269 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
278 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
270
279
271 def _check_created(self, resp, path, type='notebook'):
280 def _check_created(self, resp, path, type='notebook'):
272 self.assertEqual(resp.status_code, 201)
281 self.assertEqual(resp.status_code, 201)
273 location_header = py3compat.str_to_unicode(resp.headers['Location'])
282 location_header = py3compat.str_to_unicode(resp.headers['Location'])
274 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
283 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
275 rjson = resp.json()
284 rjson = resp.json()
276 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
285 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
277 self.assertEqual(rjson['path'], path)
286 self.assertEqual(rjson['path'], path)
278 self.assertEqual(rjson['type'], type)
287 self.assertEqual(rjson['type'], type)
279 isright = os.path.isdir if type == 'directory' else os.path.isfile
288 isright = os.path.isdir if type == 'directory' else os.path.isfile
280 assert isright(pjoin(
289 assert isright(pjoin(
281 self.notebook_dir.name,
290 self.notebook_dir.name,
282 path.replace('/', os.sep),
291 path.replace('/', os.sep),
283 ))
292 ))
284
293
285 def test_create_untitled(self):
294 def test_create_untitled(self):
286 resp = self.api.create_untitled(path=u'Γ₯ b')
295 resp = self.api.create_untitled(path=u'Γ₯ b')
287 self._check_created(resp, u'Γ₯ b/Untitled0.ipynb')
296 self._check_created(resp, u'Γ₯ b/Untitled0.ipynb')
288
297
289 # Second time
298 # Second time
290 resp = self.api.create_untitled(path=u'Γ₯ b')
299 resp = self.api.create_untitled(path=u'Γ₯ b')
291 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
300 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
292
301
293 # And two directories down
302 # And two directories down
294 resp = self.api.create_untitled(path='foo/bar')
303 resp = self.api.create_untitled(path='foo/bar')
295 self._check_created(resp, 'foo/bar/Untitled0.ipynb')
304 self._check_created(resp, 'foo/bar/Untitled0.ipynb')
296
305
297 def test_create_untitled_txt(self):
306 def test_create_untitled_txt(self):
298 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
307 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
299 self._check_created(resp, 'foo/bar/untitled0.txt', type='file')
308 self._check_created(resp, 'foo/bar/untitled0.txt', type='file')
300
309
301 resp = self.api.read(path='foo/bar/untitled0.txt')
310 resp = self.api.read(path='foo/bar/untitled0.txt')
302 model = resp.json()
311 model = resp.json()
303 self.assertEqual(model['type'], 'file')
312 self.assertEqual(model['type'], 'file')
304 self.assertEqual(model['format'], 'text')
313 self.assertEqual(model['format'], 'text')
305 self.assertEqual(model['content'], '')
314 self.assertEqual(model['content'], '')
306
315
307 def test_upload(self):
316 def test_upload(self):
308 nb = new_notebook()
317 nb = new_notebook()
309 nbmodel = {'content': nb, 'type': 'notebook'}
318 nbmodel = {'content': nb, 'type': 'notebook'}
310 path = u'Γ₯ b/Upload tΓ©st.ipynb'
319 path = u'Γ₯ b/Upload tΓ©st.ipynb'
311 resp = self.api.upload(path, body=json.dumps(nbmodel))
320 resp = self.api.upload(path, body=json.dumps(nbmodel))
312 self._check_created(resp, path)
321 self._check_created(resp, path)
313
322
314 def test_mkdir_untitled(self):
323 def test_mkdir_untitled(self):
315 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
324 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
316 self._check_created(resp, u'Γ₯ b/Untitled Folder0', type='directory')
325 self._check_created(resp, u'Γ₯ b/Untitled Folder0', type='directory')
317
326
318 # Second time
327 # Second time
319 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
328 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
320 self._check_created(resp, u'Γ₯ b/Untitled Folder1', type='directory')
329 self._check_created(resp, u'Γ₯ b/Untitled Folder1', type='directory')
321
330
322 # And two directories down
331 # And two directories down
323 resp = self.api.mkdir_untitled(path='foo/bar')
332 resp = self.api.mkdir_untitled(path='foo/bar')
324 self._check_created(resp, 'foo/bar/Untitled Folder0', type='directory')
333 self._check_created(resp, 'foo/bar/Untitled Folder0', type='directory')
325
334
326 def test_mkdir(self):
335 def test_mkdir(self):
327 path = u'Γ₯ b/New βˆ‚ir'
336 path = u'Γ₯ b/New βˆ‚ir'
328 resp = self.api.mkdir(path)
337 resp = self.api.mkdir(path)
329 self._check_created(resp, path, type='directory')
338 self._check_created(resp, path, type='directory')
330
339
331 def test_mkdir_hidden_400(self):
340 def test_mkdir_hidden_400(self):
332 with assert_http_error(400):
341 with assert_http_error(400):
333 resp = self.api.mkdir(u'Γ₯ b/.hidden')
342 resp = self.api.mkdir(u'Γ₯ b/.hidden')
334
343
335 def test_upload_txt(self):
344 def test_upload_txt(self):
336 body = u'ΓΌnicode tΓ©xt'
345 body = u'ΓΌnicode tΓ©xt'
337 model = {
346 model = {
338 'content' : body,
347 'content' : body,
339 'format' : 'text',
348 'format' : 'text',
340 'type' : 'file',
349 'type' : 'file',
341 }
350 }
342 path = u'Γ₯ b/Upload tΓ©st.txt'
351 path = u'Γ₯ b/Upload tΓ©st.txt'
343 resp = self.api.upload(path, body=json.dumps(model))
352 resp = self.api.upload(path, body=json.dumps(model))
344
353
345 # check roundtrip
354 # check roundtrip
346 resp = self.api.read(path)
355 resp = self.api.read(path)
347 model = resp.json()
356 model = resp.json()
348 self.assertEqual(model['type'], 'file')
357 self.assertEqual(model['type'], 'file')
349 self.assertEqual(model['format'], 'text')
358 self.assertEqual(model['format'], 'text')
350 self.assertEqual(model['content'], body)
359 self.assertEqual(model['content'], body)
351
360
352 def test_upload_b64(self):
361 def test_upload_b64(self):
353 body = b'\xFFblob'
362 body = b'\xFFblob'
354 b64body = base64.encodestring(body).decode('ascii')
363 b64body = base64.encodestring(body).decode('ascii')
355 model = {
364 model = {
356 'content' : b64body,
365 'content' : b64body,
357 'format' : 'base64',
366 'format' : 'base64',
358 'type' : 'file',
367 'type' : 'file',
359 }
368 }
360 path = u'Γ₯ b/Upload tΓ©st.blob'
369 path = u'Γ₯ b/Upload tΓ©st.blob'
361 resp = self.api.upload(path, body=json.dumps(model))
370 resp = self.api.upload(path, body=json.dumps(model))
362
371
363 # check roundtrip
372 # check roundtrip
364 resp = self.api.read(path)
373 resp = self.api.read(path)
365 model = resp.json()
374 model = resp.json()
366 self.assertEqual(model['type'], 'file')
375 self.assertEqual(model['type'], 'file')
367 self.assertEqual(model['path'], path)
376 self.assertEqual(model['path'], path)
368 self.assertEqual(model['format'], 'base64')
377 self.assertEqual(model['format'], 'base64')
369 decoded = base64.decodestring(model['content'].encode('ascii'))
378 decoded = base64.decodestring(model['content'].encode('ascii'))
370 self.assertEqual(decoded, body)
379 self.assertEqual(decoded, body)
371
380
372 def test_upload_v2(self):
381 def test_upload_v2(self):
373 nb = v2.new_notebook()
382 nb = v2.new_notebook()
374 ws = v2.new_worksheet()
383 ws = v2.new_worksheet()
375 nb.worksheets.append(ws)
384 nb.worksheets.append(ws)
376 ws.cells.append(v2.new_code_cell(input='print("hi")'))
385 ws.cells.append(v2.new_code_cell(input='print("hi")'))
377 nbmodel = {'content': nb, 'type': 'notebook'}
386 nbmodel = {'content': nb, 'type': 'notebook'}
378 path = u'Γ₯ b/Upload tΓ©st.ipynb'
387 path = u'Γ₯ b/Upload tΓ©st.ipynb'
379 resp = self.api.upload(path, body=json.dumps(nbmodel))
388 resp = self.api.upload(path, body=json.dumps(nbmodel))
380 self._check_created(resp, path)
389 self._check_created(resp, path)
381 resp = self.api.read(path)
390 resp = self.api.read(path)
382 data = resp.json()
391 data = resp.json()
383 self.assertEqual(data['content']['nbformat'], 4)
392 self.assertEqual(data['content']['nbformat'], 4)
384
393
385 def test_copy(self):
394 def test_copy(self):
386 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'unicodΓ©')
395 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'unicodΓ©')
387 self._check_created(resp, u'unicodΓ©/Γ§ d-Copy0.ipynb')
396 self._check_created(resp, u'unicodΓ©/Γ§ d-Copy0.ipynb')
388
397
389 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
398 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
390 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy0.ipynb')
399 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy0.ipynb')
391
400
392 def test_copy_path(self):
401 def test_copy_path(self):
393 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
402 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
394 self._check_created(resp, u'Γ₯ b/a-Copy0.ipynb')
403 self._check_created(resp, u'Γ₯ b/a-Copy0.ipynb')
395
404
396 def test_copy_put_400(self):
405 def test_copy_put_400(self):
397 with assert_http_error(400):
406 with assert_http_error(400):
398 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
407 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
399
408
400 def test_copy_dir_400(self):
409 def test_copy_dir_400(self):
401 # can't copy directories
410 # can't copy directories
402 with assert_http_error(400):
411 with assert_http_error(400):
403 resp = self.api.copy(u'Γ₯ b', u'foo')
412 resp = self.api.copy(u'Γ₯ b', u'foo')
404
413
405 def test_delete(self):
414 def test_delete(self):
406 for d, name in self.dirs_nbs:
415 for d, name in self.dirs_nbs:
407 print('%r, %r' % (d, name))
416 print('%r, %r' % (d, name))
408 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
417 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
409 self.assertEqual(resp.status_code, 204)
418 self.assertEqual(resp.status_code, 204)
410
419
411 for d in self.dirs + ['/']:
420 for d in self.dirs + ['/']:
412 nbs = notebooks_only(self.api.list(d).json())
421 nbs = notebooks_only(self.api.list(d).json())
413 print('------')
422 print('------')
414 print(d)
423 print(d)
415 print(nbs)
424 print(nbs)
416 self.assertEqual(nbs, [])
425 self.assertEqual(nbs, [])
417
426
418 def test_delete_dirs(self):
427 def test_delete_dirs(self):
419 # depth-first delete everything, so we don't try to delete empty directories
428 # depth-first delete everything, so we don't try to delete empty directories
420 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
429 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
421 listing = self.api.list(name).json()['content']
430 listing = self.api.list(name).json()['content']
422 for model in listing:
431 for model in listing:
423 self.api.delete(model['path'])
432 self.api.delete(model['path'])
424 listing = self.api.list('/').json()['content']
433 listing = self.api.list('/').json()['content']
425 self.assertEqual(listing, [])
434 self.assertEqual(listing, [])
426
435
427 def test_delete_non_empty_dir(self):
436 def test_delete_non_empty_dir(self):
428 """delete non-empty dir raises 400"""
437 """delete non-empty dir raises 400"""
429 with assert_http_error(400):
438 with assert_http_error(400):
430 self.api.delete(u'Γ₯ b')
439 self.api.delete(u'Γ₯ b')
431
440
432 def test_rename(self):
441 def test_rename(self):
433 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
442 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
434 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
443 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
435 self.assertEqual(resp.json()['name'], 'z.ipynb')
444 self.assertEqual(resp.json()['name'], 'z.ipynb')
436 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
445 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
437 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
446 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
438
447
439 nbs = notebooks_only(self.api.list('foo').json())
448 nbs = notebooks_only(self.api.list('foo').json())
440 nbnames = set(n['name'] for n in nbs)
449 nbnames = set(n['name'] for n in nbs)
441 self.assertIn('z.ipynb', nbnames)
450 self.assertIn('z.ipynb', nbnames)
442 self.assertNotIn('a.ipynb', nbnames)
451 self.assertNotIn('a.ipynb', nbnames)
443
452
444 def test_rename_existing(self):
453 def test_rename_existing(self):
445 with assert_http_error(409):
454 with assert_http_error(409):
446 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
455 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
447
456
448 def test_save(self):
457 def test_save(self):
449 resp = self.api.read('foo/a.ipynb')
458 resp = self.api.read('foo/a.ipynb')
450 nbcontent = json.loads(resp.text)['content']
459 nbcontent = json.loads(resp.text)['content']
451 nb = from_dict(nbcontent)
460 nb = from_dict(nbcontent)
452 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
461 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
453
462
454 nbmodel= {'content': nb, 'type': 'notebook'}
463 nbmodel= {'content': nb, 'type': 'notebook'}
455 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
464 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
456
465
457 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
466 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
458 with io.open(nbfile, 'r', encoding='utf-8') as f:
467 with io.open(nbfile, 'r', encoding='utf-8') as f:
459 newnb = read(f, as_version=4)
468 newnb = read(f, as_version=4)
460 self.assertEqual(newnb.cells[0].source,
469 self.assertEqual(newnb.cells[0].source,
461 u'Created by test Β³')
470 u'Created by test Β³')
462 nbcontent = self.api.read('foo/a.ipynb').json()['content']
471 nbcontent = self.api.read('foo/a.ipynb').json()['content']
463 newnb = from_dict(nbcontent)
472 newnb = from_dict(nbcontent)
464 self.assertEqual(newnb.cells[0].source,
473 self.assertEqual(newnb.cells[0].source,
465 u'Created by test Β³')
474 u'Created by test Β³')
466
475
467
476
468 def test_checkpoints(self):
477 def test_checkpoints(self):
469 resp = self.api.read('foo/a.ipynb')
478 resp = self.api.read('foo/a.ipynb')
470 r = self.api.new_checkpoint('foo/a.ipynb')
479 r = self.api.new_checkpoint('foo/a.ipynb')
471 self.assertEqual(r.status_code, 201)
480 self.assertEqual(r.status_code, 201)
472 cp1 = r.json()
481 cp1 = r.json()
473 self.assertEqual(set(cp1), {'id', 'last_modified'})
482 self.assertEqual(set(cp1), {'id', 'last_modified'})
474 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
483 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
475
484
476 # Modify it
485 # Modify it
477 nbcontent = json.loads(resp.text)['content']
486 nbcontent = json.loads(resp.text)['content']
478 nb = from_dict(nbcontent)
487 nb = from_dict(nbcontent)
479 hcell = new_markdown_cell('Created by test')
488 hcell = new_markdown_cell('Created by test')
480 nb.cells.append(hcell)
489 nb.cells.append(hcell)
481 # Save
490 # Save
482 nbmodel= {'content': nb, 'type': 'notebook'}
491 nbmodel= {'content': nb, 'type': 'notebook'}
483 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
492 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
484
493
485 # List checkpoints
494 # List checkpoints
486 cps = self.api.get_checkpoints('foo/a.ipynb').json()
495 cps = self.api.get_checkpoints('foo/a.ipynb').json()
487 self.assertEqual(cps, [cp1])
496 self.assertEqual(cps, [cp1])
488
497
489 nbcontent = self.api.read('foo/a.ipynb').json()['content']
498 nbcontent = self.api.read('foo/a.ipynb').json()['content']
490 nb = from_dict(nbcontent)
499 nb = from_dict(nbcontent)
491 self.assertEqual(nb.cells[0].source, 'Created by test')
500 self.assertEqual(nb.cells[0].source, 'Created by test')
492
501
493 # Restore cp1
502 # Restore cp1
494 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
503 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
495 self.assertEqual(r.status_code, 204)
504 self.assertEqual(r.status_code, 204)
496 nbcontent = self.api.read('foo/a.ipynb').json()['content']
505 nbcontent = self.api.read('foo/a.ipynb').json()['content']
497 nb = from_dict(nbcontent)
506 nb = from_dict(nbcontent)
498 self.assertEqual(nb.cells, [])
507 self.assertEqual(nb.cells, [])
499
508
500 # Delete cp1
509 # Delete cp1
501 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
510 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
502 self.assertEqual(r.status_code, 204)
511 self.assertEqual(r.status_code, 204)
503 cps = self.api.get_checkpoints('foo/a.ipynb').json()
512 cps = self.api.get_checkpoints('foo/a.ipynb').json()
504 self.assertEqual(cps, [])
513 self.assertEqual(cps, [])
@@ -1,362 +1,365 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import logging
5 import logging
6 import os
6 import os
7
7
8 from tornado.web import HTTPError
8 from tornado.web import HTTPError
9 from unittest import TestCase
9 from unittest import TestCase
10 from tempfile import NamedTemporaryFile
10 from tempfile import NamedTemporaryFile
11
11
12 from IPython.nbformat import v4 as nbformat
12 from IPython.nbformat import v4 as nbformat
13
13
14 from IPython.utils.tempdir import TemporaryDirectory
14 from IPython.utils.tempdir import TemporaryDirectory
15 from IPython.utils.traitlets import TraitError
15 from IPython.utils.traitlets import TraitError
16 from IPython.html.utils import url_path_join
16 from IPython.html.utils import url_path_join
17 from IPython.testing import decorators as dec
17 from IPython.testing import decorators as dec
18
18
19 from ..filemanager import FileContentsManager
19 from ..filemanager import FileContentsManager
20 from ..manager import ContentsManager
20 from ..manager import ContentsManager
21
21
22
22
23 class TestFileContentsManager(TestCase):
23 class TestFileContentsManager(TestCase):
24
24
25 def test_root_dir(self):
25 def test_root_dir(self):
26 with TemporaryDirectory() as td:
26 with TemporaryDirectory() as td:
27 fm = FileContentsManager(root_dir=td)
27 fm = FileContentsManager(root_dir=td)
28 self.assertEqual(fm.root_dir, td)
28 self.assertEqual(fm.root_dir, td)
29
29
30 def test_missing_root_dir(self):
30 def test_missing_root_dir(self):
31 with TemporaryDirectory() as td:
31 with TemporaryDirectory() as td:
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34
34
35 def test_invalid_root_dir(self):
35 def test_invalid_root_dir(self):
36 with NamedTemporaryFile() as tf:
36 with NamedTemporaryFile() as tf:
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38
38
39 def test_get_os_path(self):
39 def test_get_os_path(self):
40 # full filesystem path should be returned with correct operating system
40 # full filesystem path should be returned with correct operating system
41 # separators.
41 # separators.
42 with TemporaryDirectory() as td:
42 with TemporaryDirectory() as td:
43 root = td
43 root = td
44 fm = FileContentsManager(root_dir=root)
44 fm = FileContentsManager(root_dir=root)
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 self.assertEqual(path, fs_path)
48 self.assertEqual(path, fs_path)
49
49
50 fm = FileContentsManager(root_dir=root)
50 fm = FileContentsManager(root_dir=root)
51 path = fm._get_os_path('test.ipynb')
51 path = fm._get_os_path('test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 self.assertEqual(path, fs_path)
53 self.assertEqual(path, fs_path)
54
54
55 fm = FileContentsManager(root_dir=root)
55 fm = FileContentsManager(root_dir=root)
56 path = fm._get_os_path('////test.ipynb')
56 path = fm._get_os_path('////test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 self.assertEqual(path, fs_path)
58 self.assertEqual(path, fs_path)
59
59
60 def test_checkpoint_subdir(self):
60 def test_checkpoint_subdir(self):
61 subd = u'sub βˆ‚ir'
61 subd = u'sub βˆ‚ir'
62 cp_name = 'test-cp.ipynb'
62 cp_name = 'test-cp.ipynb'
63 with TemporaryDirectory() as td:
63 with TemporaryDirectory() as td:
64 root = td
64 root = td
65 os.mkdir(os.path.join(td, subd))
65 os.mkdir(os.path.join(td, subd))
66 fm = FileContentsManager(root_dir=root)
66 fm = FileContentsManager(root_dir=root)
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
69 self.assertNotEqual(cp_dir, cp_subdir)
69 self.assertNotEqual(cp_dir, cp_subdir)
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72
72
73
73
74 class TestContentsManager(TestCase):
74 class TestContentsManager(TestCase):
75
75
76 def setUp(self):
76 def setUp(self):
77 self._temp_dir = TemporaryDirectory()
77 self._temp_dir = TemporaryDirectory()
78 self.td = self._temp_dir.name
78 self.td = self._temp_dir.name
79 self.contents_manager = FileContentsManager(
79 self.contents_manager = FileContentsManager(
80 root_dir=self.td,
80 root_dir=self.td,
81 log=logging.getLogger()
81 log=logging.getLogger()
82 )
82 )
83
83
84 def tearDown(self):
84 def tearDown(self):
85 self._temp_dir.cleanup()
85 self._temp_dir.cleanup()
86
86
87 def make_dir(self, abs_path, rel_path):
87 def make_dir(self, abs_path, rel_path):
88 """make subdirectory, rel_path is the relative path
88 """make subdirectory, rel_path is the relative path
89 to that directory from the location where the server started"""
89 to that directory from the location where the server started"""
90 os_path = os.path.join(abs_path, rel_path)
90 os_path = os.path.join(abs_path, rel_path)
91 try:
91 try:
92 os.makedirs(os_path)
92 os.makedirs(os_path)
93 except OSError:
93 except OSError:
94 print("Directory already exists: %r" % os_path)
94 print("Directory already exists: %r" % os_path)
95 return os_path
95 return os_path
96
96
97 def add_code_cell(self, nb):
97 def add_code_cell(self, nb):
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 nb.cells.append(cell)
100 nb.cells.append(cell)
101
101
102 def new_notebook(self):
102 def new_notebook(self):
103 cm = self.contents_manager
103 cm = self.contents_manager
104 model = cm.new_untitled(type='notebook')
104 model = cm.new_untitled(type='notebook')
105 name = model['name']
105 name = model['name']
106 path = model['path']
106 path = model['path']
107
107
108 full_model = cm.get_model(path)
108 full_model = cm.get_model(path)
109 nb = full_model['content']
109 nb = full_model['content']
110 self.add_code_cell(nb)
110 self.add_code_cell(nb)
111
111
112 cm.save(full_model, path)
112 cm.save(full_model, path)
113 return nb, name, path
113 return nb, name, path
114
114
115 def test_new_untitled(self):
115 def test_new_untitled(self):
116 cm = self.contents_manager
116 cm = self.contents_manager
117 # Test in root directory
117 # Test in root directory
118 model = cm.new_untitled(type='notebook')
118 model = cm.new_untitled(type='notebook')
119 assert isinstance(model, dict)
119 assert isinstance(model, dict)
120 self.assertIn('name', model)
120 self.assertIn('name', model)
121 self.assertIn('path', model)
121 self.assertIn('path', model)
122 self.assertIn('type', model)
122 self.assertIn('type', model)
123 self.assertEqual(model['type'], 'notebook')
123 self.assertEqual(model['type'], 'notebook')
124 self.assertEqual(model['name'], 'Untitled0.ipynb')
124 self.assertEqual(model['name'], 'Untitled0.ipynb')
125 self.assertEqual(model['path'], 'Untitled0.ipynb')
125 self.assertEqual(model['path'], 'Untitled0.ipynb')
126
126
127 # Test in sub-directory
127 # Test in sub-directory
128 model = cm.new_untitled(type='directory')
128 model = cm.new_untitled(type='directory')
129 assert isinstance(model, dict)
129 assert isinstance(model, dict)
130 self.assertIn('name', model)
130 self.assertIn('name', model)
131 self.assertIn('path', model)
131 self.assertIn('path', model)
132 self.assertIn('type', model)
132 self.assertIn('type', model)
133 self.assertEqual(model['type'], 'directory')
133 self.assertEqual(model['type'], 'directory')
134 self.assertEqual(model['name'], 'Untitled Folder0')
134 self.assertEqual(model['name'], 'Untitled Folder0')
135 self.assertEqual(model['path'], 'Untitled Folder0')
135 self.assertEqual(model['path'], 'Untitled Folder0')
136 sub_dir = model['path']
136 sub_dir = model['path']
137
137
138 model = cm.new_untitled(path=sub_dir)
138 model = cm.new_untitled(path=sub_dir)
139 assert isinstance(model, dict)
139 assert isinstance(model, dict)
140 self.assertIn('name', model)
140 self.assertIn('name', model)
141 self.assertIn('path', model)
141 self.assertIn('path', model)
142 self.assertIn('type', model)
142 self.assertIn('type', model)
143 self.assertEqual(model['type'], 'file')
143 self.assertEqual(model['type'], 'file')
144 self.assertEqual(model['name'], 'untitled0')
144 self.assertEqual(model['name'], 'untitled0')
145 self.assertEqual(model['path'], '%s/untitled0' % sub_dir)
145 self.assertEqual(model['path'], '%s/untitled0' % sub_dir)
146
146
147 def test_get(self):
147 def test_get(self):
148 cm = self.contents_manager
148 cm = self.contents_manager
149 # Create a notebook
149 # Create a notebook
150 model = cm.new_untitled(type='notebook')
150 model = cm.new_untitled(type='notebook')
151 name = model['name']
151 name = model['name']
152 path = model['path']
152 path = model['path']
153
153
154 # Check that we 'get' on the notebook we just created
154 # Check that we 'get' on the notebook we just created
155 model2 = cm.get_model(path)
155 model2 = cm.get_model(path)
156 assert isinstance(model2, dict)
156 assert isinstance(model2, dict)
157 self.assertIn('name', model2)
157 self.assertIn('name', model2)
158 self.assertIn('path', model2)
158 self.assertIn('path', model2)
159 self.assertEqual(model['name'], name)
159 self.assertEqual(model['name'], name)
160 self.assertEqual(model['path'], path)
160 self.assertEqual(model['path'], path)
161
161
162 nb_as_file = cm.get_model(path, content=True, type_='file')
162 nb_as_file = cm.get_model(path, content=True, type_='file')
163 self.assertEqual(nb_as_file['path'], path)
163 self.assertEqual(nb_as_file['path'], path)
164 self.assertEqual(nb_as_file['type'], 'file')
164 self.assertEqual(nb_as_file['type'], 'file')
165 self.assertEqual(nb_as_file['format'], 'text')
165 self.assertEqual(nb_as_file['format'], 'text')
166 self.assertNotIsInstance(nb_as_file['content'], dict)
166 self.assertNotIsInstance(nb_as_file['content'], dict)
167
167
168 nb_as_bin_file = cm.get_model(path, content=True, type_='file', format='base64')
169 self.assertEqual(nb_as_bin_file['format'], 'base64')
170
168 # Test in sub-directory
171 # Test in sub-directory
169 sub_dir = '/foo/'
172 sub_dir = '/foo/'
170 self.make_dir(cm.root_dir, 'foo')
173 self.make_dir(cm.root_dir, 'foo')
171 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
174 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
172 model2 = cm.get_model(sub_dir + name)
175 model2 = cm.get_model(sub_dir + name)
173 assert isinstance(model2, dict)
176 assert isinstance(model2, dict)
174 self.assertIn('name', model2)
177 self.assertIn('name', model2)
175 self.assertIn('path', model2)
178 self.assertIn('path', model2)
176 self.assertIn('content', model2)
179 self.assertIn('content', model2)
177 self.assertEqual(model2['name'], 'Untitled0.ipynb')
180 self.assertEqual(model2['name'], 'Untitled0.ipynb')
178 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
181 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
179
182
180 # Test getting directory model
183 # Test getting directory model
181 dirmodel = cm.get_model('foo')
184 dirmodel = cm.get_model('foo')
182 self.assertEqual(dirmodel['type'], 'directory')
185 self.assertEqual(dirmodel['type'], 'directory')
183
186
184 with self.assertRaises(HTTPError):
187 with self.assertRaises(HTTPError):
185 cm.get_model('foo', type_='file')
188 cm.get_model('foo', type_='file')
186
189
187
190
188 @dec.skip_win32
191 @dec.skip_win32
189 def test_bad_symlink(self):
192 def test_bad_symlink(self):
190 cm = self.contents_manager
193 cm = self.contents_manager
191 path = 'test bad symlink'
194 path = 'test bad symlink'
192 os_path = self.make_dir(cm.root_dir, path)
195 os_path = self.make_dir(cm.root_dir, path)
193
196
194 file_model = cm.new_untitled(path=path, ext='.txt')
197 file_model = cm.new_untitled(path=path, ext='.txt')
195
198
196 # create a broken symlink
199 # create a broken symlink
197 os.symlink("target", os.path.join(os_path, "bad symlink"))
200 os.symlink("target", os.path.join(os_path, "bad symlink"))
198 model = cm.get_model(path)
201 model = cm.get_model(path)
199 self.assertEqual(model['content'], [file_model])
202 self.assertEqual(model['content'], [file_model])
200
203
201 @dec.skip_win32
204 @dec.skip_win32
202 def test_good_symlink(self):
205 def test_good_symlink(self):
203 cm = self.contents_manager
206 cm = self.contents_manager
204 parent = 'test good symlink'
207 parent = 'test good symlink'
205 name = 'good symlink'
208 name = 'good symlink'
206 path = '{0}/{1}'.format(parent, name)
209 path = '{0}/{1}'.format(parent, name)
207 os_path = self.make_dir(cm.root_dir, parent)
210 os_path = self.make_dir(cm.root_dir, parent)
208
211
209 file_model = cm.new(path=parent + '/zfoo.txt')
212 file_model = cm.new(path=parent + '/zfoo.txt')
210
213
211 # create a good symlink
214 # create a good symlink
212 os.symlink(file_model['name'], os.path.join(os_path, name))
215 os.symlink(file_model['name'], os.path.join(os_path, name))
213 symlink_model = cm.get_model(path, content=False)
216 symlink_model = cm.get_model(path, content=False)
214 dir_model = cm.get_model(parent)
217 dir_model = cm.get_model(parent)
215 self.assertEqual(
218 self.assertEqual(
216 sorted(dir_model['content'], key=lambda x: x['name']),
219 sorted(dir_model['content'], key=lambda x: x['name']),
217 [symlink_model, file_model],
220 [symlink_model, file_model],
218 )
221 )
219
222
220 def test_update(self):
223 def test_update(self):
221 cm = self.contents_manager
224 cm = self.contents_manager
222 # Create a notebook
225 # Create a notebook
223 model = cm.new_untitled(type='notebook')
226 model = cm.new_untitled(type='notebook')
224 name = model['name']
227 name = model['name']
225 path = model['path']
228 path = model['path']
226
229
227 # Change the name in the model for rename
230 # Change the name in the model for rename
228 model['path'] = 'test.ipynb'
231 model['path'] = 'test.ipynb'
229 model = cm.update(model, path)
232 model = cm.update(model, path)
230 assert isinstance(model, dict)
233 assert isinstance(model, dict)
231 self.assertIn('name', model)
234 self.assertIn('name', model)
232 self.assertIn('path', model)
235 self.assertIn('path', model)
233 self.assertEqual(model['name'], 'test.ipynb')
236 self.assertEqual(model['name'], 'test.ipynb')
234
237
235 # Make sure the old name is gone
238 # Make sure the old name is gone
236 self.assertRaises(HTTPError, cm.get_model, path)
239 self.assertRaises(HTTPError, cm.get_model, path)
237
240
238 # Test in sub-directory
241 # Test in sub-directory
239 # Create a directory and notebook in that directory
242 # Create a directory and notebook in that directory
240 sub_dir = '/foo/'
243 sub_dir = '/foo/'
241 self.make_dir(cm.root_dir, 'foo')
244 self.make_dir(cm.root_dir, 'foo')
242 model = cm.new_untitled(path=sub_dir, type='notebook')
245 model = cm.new_untitled(path=sub_dir, type='notebook')
243 name = model['name']
246 name = model['name']
244 path = model['path']
247 path = model['path']
245
248
246 # Change the name in the model for rename
249 # Change the name in the model for rename
247 d = path.rsplit('/', 1)[0]
250 d = path.rsplit('/', 1)[0]
248 new_path = model['path'] = d + '/test_in_sub.ipynb'
251 new_path = model['path'] = d + '/test_in_sub.ipynb'
249 model = cm.update(model, path)
252 model = cm.update(model, path)
250 assert isinstance(model, dict)
253 assert isinstance(model, dict)
251 self.assertIn('name', model)
254 self.assertIn('name', model)
252 self.assertIn('path', model)
255 self.assertIn('path', model)
253 self.assertEqual(model['name'], 'test_in_sub.ipynb')
256 self.assertEqual(model['name'], 'test_in_sub.ipynb')
254 self.assertEqual(model['path'], new_path)
257 self.assertEqual(model['path'], new_path)
255
258
256 # Make sure the old name is gone
259 # Make sure the old name is gone
257 self.assertRaises(HTTPError, cm.get_model, path)
260 self.assertRaises(HTTPError, cm.get_model, path)
258
261
259 def test_save(self):
262 def test_save(self):
260 cm = self.contents_manager
263 cm = self.contents_manager
261 # Create a notebook
264 # Create a notebook
262 model = cm.new_untitled(type='notebook')
265 model = cm.new_untitled(type='notebook')
263 name = model['name']
266 name = model['name']
264 path = model['path']
267 path = model['path']
265
268
266 # Get the model with 'content'
269 # Get the model with 'content'
267 full_model = cm.get_model(path)
270 full_model = cm.get_model(path)
268
271
269 # Save the notebook
272 # Save the notebook
270 model = cm.save(full_model, path)
273 model = cm.save(full_model, path)
271 assert isinstance(model, dict)
274 assert isinstance(model, dict)
272 self.assertIn('name', model)
275 self.assertIn('name', model)
273 self.assertIn('path', model)
276 self.assertIn('path', model)
274 self.assertEqual(model['name'], name)
277 self.assertEqual(model['name'], name)
275 self.assertEqual(model['path'], path)
278 self.assertEqual(model['path'], path)
276
279
277 # Test in sub-directory
280 # Test in sub-directory
278 # Create a directory and notebook in that directory
281 # Create a directory and notebook in that directory
279 sub_dir = '/foo/'
282 sub_dir = '/foo/'
280 self.make_dir(cm.root_dir, 'foo')
283 self.make_dir(cm.root_dir, 'foo')
281 model = cm.new_untitled(path=sub_dir, type='notebook')
284 model = cm.new_untitled(path=sub_dir, type='notebook')
282 name = model['name']
285 name = model['name']
283 path = model['path']
286 path = model['path']
284 model = cm.get_model(path)
287 model = cm.get_model(path)
285
288
286 # Change the name in the model for rename
289 # Change the name in the model for rename
287 model = cm.save(model, path)
290 model = cm.save(model, path)
288 assert isinstance(model, dict)
291 assert isinstance(model, dict)
289 self.assertIn('name', model)
292 self.assertIn('name', model)
290 self.assertIn('path', model)
293 self.assertIn('path', model)
291 self.assertEqual(model['name'], 'Untitled0.ipynb')
294 self.assertEqual(model['name'], 'Untitled0.ipynb')
292 self.assertEqual(model['path'], 'foo/Untitled0.ipynb')
295 self.assertEqual(model['path'], 'foo/Untitled0.ipynb')
293
296
294 def test_delete(self):
297 def test_delete(self):
295 cm = self.contents_manager
298 cm = self.contents_manager
296 # Create a notebook
299 # Create a notebook
297 nb, name, path = self.new_notebook()
300 nb, name, path = self.new_notebook()
298
301
299 # Delete the notebook
302 # Delete the notebook
300 cm.delete(path)
303 cm.delete(path)
301
304
302 # Check that a 'get' on the deleted notebook raises and error
305 # Check that a 'get' on the deleted notebook raises and error
303 self.assertRaises(HTTPError, cm.get_model, path)
306 self.assertRaises(HTTPError, cm.get_model, path)
304
307
305 def test_copy(self):
308 def test_copy(self):
306 cm = self.contents_manager
309 cm = self.contents_manager
307 parent = u'Γ₯ b'
310 parent = u'Γ₯ b'
308 name = u'nb √.ipynb'
311 name = u'nb √.ipynb'
309 path = u'{0}/{1}'.format(parent, name)
312 path = u'{0}/{1}'.format(parent, name)
310 os.mkdir(os.path.join(cm.root_dir, parent))
313 os.mkdir(os.path.join(cm.root_dir, parent))
311 orig = cm.new(path=path)
314 orig = cm.new(path=path)
312
315
313 # copy with unspecified name
316 # copy with unspecified name
314 copy = cm.copy(path)
317 copy = cm.copy(path)
315 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
318 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
316
319
317 # copy with specified name
320 # copy with specified name
318 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
321 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
319 self.assertEqual(copy2['name'], u'copy 2.ipynb')
322 self.assertEqual(copy2['name'], u'copy 2.ipynb')
320 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
323 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
321
324
322 def test_trust_notebook(self):
325 def test_trust_notebook(self):
323 cm = self.contents_manager
326 cm = self.contents_manager
324 nb, name, path = self.new_notebook()
327 nb, name, path = self.new_notebook()
325
328
326 untrusted = cm.get_model(path)['content']
329 untrusted = cm.get_model(path)['content']
327 assert not cm.notary.check_cells(untrusted)
330 assert not cm.notary.check_cells(untrusted)
328
331
329 # print(untrusted)
332 # print(untrusted)
330 cm.trust_notebook(path)
333 cm.trust_notebook(path)
331 trusted = cm.get_model(path)['content']
334 trusted = cm.get_model(path)['content']
332 # print(trusted)
335 # print(trusted)
333 assert cm.notary.check_cells(trusted)
336 assert cm.notary.check_cells(trusted)
334
337
335 def test_mark_trusted_cells(self):
338 def test_mark_trusted_cells(self):
336 cm = self.contents_manager
339 cm = self.contents_manager
337 nb, name, path = self.new_notebook()
340 nb, name, path = self.new_notebook()
338
341
339 cm.mark_trusted_cells(nb, path)
342 cm.mark_trusted_cells(nb, path)
340 for cell in nb.cells:
343 for cell in nb.cells:
341 if cell.cell_type == 'code':
344 if cell.cell_type == 'code':
342 assert not cell.metadata.trusted
345 assert not cell.metadata.trusted
343
346
344 cm.trust_notebook(path)
347 cm.trust_notebook(path)
345 nb = cm.get_model(path)['content']
348 nb = cm.get_model(path)['content']
346 for cell in nb.cells:
349 for cell in nb.cells:
347 if cell.cell_type == 'code':
350 if cell.cell_type == 'code':
348 assert cell.metadata.trusted
351 assert cell.metadata.trusted
349
352
350 def test_check_and_sign(self):
353 def test_check_and_sign(self):
351 cm = self.contents_manager
354 cm = self.contents_manager
352 nb, name, path = self.new_notebook()
355 nb, name, path = self.new_notebook()
353
356
354 cm.mark_trusted_cells(nb, path)
357 cm.mark_trusted_cells(nb, path)
355 cm.check_and_sign(nb, path)
358 cm.check_and_sign(nb, path)
356 assert not cm.notary.check_signature(nb)
359 assert not cm.notary.check_signature(nb)
357
360
358 cm.trust_notebook(path)
361 cm.trust_notebook(path)
359 nb = cm.get_model(path)['content']
362 nb = cm.get_model(path)['content']
360 cm.mark_trusted_cells(nb, path)
363 cm.mark_trusted_cells(nb, path)
361 cm.check_and_sign(nb, path)
364 cm.check_and_sign(nb, path)
362 assert cm.notary.check_signature(nb)
365 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now