##// END OF EJS Templates
BUG: Don't allow ContentsManager to delete root....
Scott Sanderson -
Show More
@@ -1,468 +1,471 b''
1 """A base class for contents managers."""
1 """A base class for contents managers."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from fnmatch import fnmatch
6 from fnmatch import fnmatch
7 import itertools
7 import itertools
8 import json
8 import json
9 import os
9 import os
10 import re
10 import re
11
11
12 from tornado.web import HTTPError
12 from tornado.web import HTTPError
13
13
14 from .checkpoints import Checkpoints
14 from .checkpoints import Checkpoints
15 from IPython.config.configurable import LoggingConfigurable
15 from IPython.config.configurable import LoggingConfigurable
16 from IPython.nbformat import sign, validate, ValidationError
16 from IPython.nbformat import sign, validate, ValidationError
17 from IPython.nbformat.v4 import new_notebook
17 from IPython.nbformat.v4 import new_notebook
18 from IPython.utils.importstring import import_item
18 from IPython.utils.importstring import import_item
19 from IPython.utils.traitlets import (
19 from IPython.utils.traitlets import (
20 Any,
20 Any,
21 Dict,
21 Dict,
22 Instance,
22 Instance,
23 List,
23 List,
24 TraitError,
24 TraitError,
25 Type,
25 Type,
26 Unicode,
26 Unicode,
27 )
27 )
28 from IPython.utils.py3compat import string_types
28 from IPython.utils.py3compat import string_types
29
29
30 copy_pat = re.compile(r'\-Copy\d*\.')
30 copy_pat = re.compile(r'\-Copy\d*\.')
31
31
32
32
33 class ContentsManager(LoggingConfigurable):
33 class ContentsManager(LoggingConfigurable):
34 """Base class for serving files and directories.
34 """Base class for serving files and directories.
35
35
36 This serves any text or binary file,
36 This serves any text or binary file,
37 as well as directories,
37 as well as directories,
38 with special handling for JSON notebook documents.
38 with special handling for JSON notebook documents.
39
39
40 Most APIs take a path argument,
40 Most APIs take a path argument,
41 which is always an API-style unicode path,
41 which is always an API-style unicode path,
42 and always refers to a directory.
42 and always refers to a directory.
43
43
44 - unicode, not url-escaped
44 - unicode, not url-escaped
45 - '/'-separated
45 - '/'-separated
46 - leading and trailing '/' will be stripped
46 - leading and trailing '/' will be stripped
47 - if unspecified, path defaults to '',
47 - if unspecified, path defaults to '',
48 indicating the root path.
48 indicating the root path.
49
49
50 """
50 """
51
51
52 notary = Instance(sign.NotebookNotary)
52 notary = Instance(sign.NotebookNotary)
53 def _notary_default(self):
53 def _notary_default(self):
54 return sign.NotebookNotary(parent=self)
54 return sign.NotebookNotary(parent=self)
55
55
56 hide_globs = List(Unicode, [
56 hide_globs = List(Unicode, [
57 u'__pycache__', '*.pyc', '*.pyo',
57 u'__pycache__', '*.pyc', '*.pyo',
58 '.DS_Store', '*.so', '*.dylib', '*~',
58 '.DS_Store', '*.so', '*.dylib', '*~',
59 ], config=True, help="""
59 ], config=True, help="""
60 Glob patterns to hide in file and directory listings.
60 Glob patterns to hide in file and directory listings.
61 """)
61 """)
62
62
63 untitled_notebook = Unicode("Untitled", config=True,
63 untitled_notebook = Unicode("Untitled", config=True,
64 help="The base name used when creating untitled notebooks."
64 help="The base name used when creating untitled notebooks."
65 )
65 )
66
66
67 untitled_file = Unicode("untitled", config=True,
67 untitled_file = Unicode("untitled", config=True,
68 help="The base name used when creating untitled files."
68 help="The base name used when creating untitled files."
69 )
69 )
70
70
71 untitled_directory = Unicode("Untitled Folder", config=True,
71 untitled_directory = Unicode("Untitled Folder", config=True,
72 help="The base name used when creating untitled directories."
72 help="The base name used when creating untitled directories."
73 )
73 )
74
74
75 pre_save_hook = Any(None, config=True,
75 pre_save_hook = Any(None, config=True,
76 help="""Python callable or importstring thereof
76 help="""Python callable or importstring thereof
77
77
78 To be called on a contents model prior to save.
78 To be called on a contents model prior to save.
79
79
80 This can be used to process the structure,
80 This can be used to process the structure,
81 such as removing notebook outputs or other side effects that
81 such as removing notebook outputs or other side effects that
82 should not be saved.
82 should not be saved.
83
83
84 It will be called as (all arguments passed by keyword)::
84 It will be called as (all arguments passed by keyword)::
85
85
86 hook(path=path, model=model, contents_manager=self)
86 hook(path=path, model=model, contents_manager=self)
87
87
88 - model: the model to be saved. Includes file contents.
88 - model: the model to be saved. Includes file contents.
89 Modifying this dict will affect the file that is stored.
89 Modifying this dict will affect the file that is stored.
90 - path: the API path of the save destination
90 - path: the API path of the save destination
91 - contents_manager: this ContentsManager instance
91 - contents_manager: this ContentsManager instance
92 """
92 """
93 )
93 )
94 def _pre_save_hook_changed(self, name, old, new):
94 def _pre_save_hook_changed(self, name, old, new):
95 if new and isinstance(new, string_types):
95 if new and isinstance(new, string_types):
96 self.pre_save_hook = import_item(self.pre_save_hook)
96 self.pre_save_hook = import_item(self.pre_save_hook)
97 elif new:
97 elif new:
98 if not callable(new):
98 if not callable(new):
99 raise TraitError("pre_save_hook must be callable")
99 raise TraitError("pre_save_hook must be callable")
100
100
101 def run_pre_save_hook(self, model, path, **kwargs):
101 def run_pre_save_hook(self, model, path, **kwargs):
102 """Run the pre-save hook if defined, and log errors"""
102 """Run the pre-save hook if defined, and log errors"""
103 if self.pre_save_hook:
103 if self.pre_save_hook:
104 try:
104 try:
105 self.log.debug("Running pre-save hook on %s", path)
105 self.log.debug("Running pre-save hook on %s", path)
106 self.pre_save_hook(model=model, path=path, contents_manager=self, **kwargs)
106 self.pre_save_hook(model=model, path=path, contents_manager=self, **kwargs)
107 except Exception:
107 except Exception:
108 self.log.error("Pre-save hook failed on %s", path, exc_info=True)
108 self.log.error("Pre-save hook failed on %s", path, exc_info=True)
109
109
110 checkpoints_class = Type(Checkpoints, config=True)
110 checkpoints_class = Type(Checkpoints, config=True)
111 checkpoints = Instance(Checkpoints, config=True)
111 checkpoints = Instance(Checkpoints, config=True)
112 checkpoints_kwargs = Dict(config=True)
112 checkpoints_kwargs = Dict(config=True)
113
113
114 def _checkpoints_default(self):
114 def _checkpoints_default(self):
115 return self.checkpoints_class(**self.checkpoints_kwargs)
115 return self.checkpoints_class(**self.checkpoints_kwargs)
116
116
117 def _checkpoints_kwargs_default(self):
117 def _checkpoints_kwargs_default(self):
118 return dict(
118 return dict(
119 parent=self,
119 parent=self,
120 log=self.log,
120 log=self.log,
121 )
121 )
122
122
123 # ContentsManager API part 1: methods that must be
123 # ContentsManager API part 1: methods that must be
124 # implemented in subclasses.
124 # implemented in subclasses.
125
125
126 def dir_exists(self, path):
126 def dir_exists(self, path):
127 """Does the API-style path (directory) actually exist?
127 """Does the API-style path (directory) actually exist?
128
128
129 Like os.path.isdir
129 Like os.path.isdir
130
130
131 Override this method in subclasses.
131 Override this method in subclasses.
132
132
133 Parameters
133 Parameters
134 ----------
134 ----------
135 path : string
135 path : string
136 The path to check
136 The path to check
137
137
138 Returns
138 Returns
139 -------
139 -------
140 exists : bool
140 exists : bool
141 Whether the path does indeed exist.
141 Whether the path does indeed exist.
142 """
142 """
143 raise NotImplementedError
143 raise NotImplementedError
144
144
145 def is_hidden(self, path):
145 def is_hidden(self, path):
146 """Does the API style path correspond to a hidden directory or file?
146 """Does the API style path correspond to a hidden directory or file?
147
147
148 Parameters
148 Parameters
149 ----------
149 ----------
150 path : string
150 path : string
151 The path to check. This is an API path (`/` separated,
151 The path to check. This is an API path (`/` separated,
152 relative to root dir).
152 relative to root dir).
153
153
154 Returns
154 Returns
155 -------
155 -------
156 hidden : bool
156 hidden : bool
157 Whether the path is hidden.
157 Whether the path is hidden.
158
158
159 """
159 """
160 raise NotImplementedError
160 raise NotImplementedError
161
161
162 def file_exists(self, path=''):
162 def file_exists(self, path=''):
163 """Does a file exist at the given path?
163 """Does a file exist at the given path?
164
164
165 Like os.path.isfile
165 Like os.path.isfile
166
166
167 Override this method in subclasses.
167 Override this method in subclasses.
168
168
169 Parameters
169 Parameters
170 ----------
170 ----------
171 name : string
171 name : string
172 The name of the file you are checking.
172 The name of the file you are checking.
173 path : string
173 path : string
174 The relative path to the file's directory (with '/' as separator)
174 The relative path to the file's directory (with '/' as separator)
175
175
176 Returns
176 Returns
177 -------
177 -------
178 exists : bool
178 exists : bool
179 Whether the file exists.
179 Whether the file exists.
180 """
180 """
181 raise NotImplementedError('must be implemented in a subclass')
181 raise NotImplementedError('must be implemented in a subclass')
182
182
183 def exists(self, path):
183 def exists(self, path):
184 """Does a file or directory exist at the given path?
184 """Does a file or directory exist at the given path?
185
185
186 Like os.path.exists
186 Like os.path.exists
187
187
188 Parameters
188 Parameters
189 ----------
189 ----------
190 path : string
190 path : string
191 The relative path to the file's directory (with '/' as separator)
191 The relative path to the file's directory (with '/' as separator)
192
192
193 Returns
193 Returns
194 -------
194 -------
195 exists : bool
195 exists : bool
196 Whether the target exists.
196 Whether the target exists.
197 """
197 """
198 return self.file_exists(path) or self.dir_exists(path)
198 return self.file_exists(path) or self.dir_exists(path)
199
199
200 def get(self, path, content=True, type=None, format=None):
200 def get(self, path, content=True, type=None, format=None):
201 """Get the model of a file or directory with or without content."""
201 """Get the model of a file or directory with or without content."""
202 raise NotImplementedError('must be implemented in a subclass')
202 raise NotImplementedError('must be implemented in a subclass')
203
203
204 def save(self, model, path):
204 def save(self, model, path):
205 """Save the file or directory and return the model with no content.
205 """Save the file or directory and return the model with no content.
206
206
207 Save implementations should call self.run_pre_save_hook(model=model, path=path)
207 Save implementations should call self.run_pre_save_hook(model=model, path=path)
208 prior to writing any data.
208 prior to writing any data.
209 """
209 """
210 raise NotImplementedError('must be implemented in a subclass')
210 raise NotImplementedError('must be implemented in a subclass')
211
211
212 def delete_file(self, path):
212 def delete_file(self, path):
213 """Delete file or directory by path."""
213 """Delete file or directory by path."""
214 raise NotImplementedError('must be implemented in a subclass')
214 raise NotImplementedError('must be implemented in a subclass')
215
215
216 def rename_file(self, old_path, new_path):
216 def rename_file(self, old_path, new_path):
217 """Rename a file."""
217 """Rename a file."""
218 raise NotImplementedError('must be implemented in a subclass')
218 raise NotImplementedError('must be implemented in a subclass')
219
219
220 # ContentsManager API part 2: methods that have useable default
220 # ContentsManager API part 2: methods that have useable default
221 # implementations, but can be overridden in subclasses.
221 # implementations, but can be overridden in subclasses.
222
222
223 def delete(self, path):
223 def delete(self, path):
224 """Delete a file/directory and any associated checkpoints."""
224 """Delete a file/directory and any associated checkpoints."""
225 path = path.strip('/')
226 if not path:
227 raise HTTPError(400, "Can't delete root")
225 self.delete_file(path)
228 self.delete_file(path)
226 self.checkpoints.delete_all_checkpoints(path)
229 self.checkpoints.delete_all_checkpoints(path)
227
230
228 def rename(self, old_path, new_path):
231 def rename(self, old_path, new_path):
229 """Rename a file and any checkpoints associated with that file."""
232 """Rename a file and any checkpoints associated with that file."""
230 self.rename_file(old_path, new_path)
233 self.rename_file(old_path, new_path)
231 self.checkpoints.rename_all_checkpoints(old_path, new_path)
234 self.checkpoints.rename_all_checkpoints(old_path, new_path)
232
235
233 def update(self, model, path):
236 def update(self, model, path):
234 """Update the file's path
237 """Update the file's path
235
238
236 For use in PATCH requests, to enable renaming a file without
239 For use in PATCH requests, to enable renaming a file without
237 re-uploading its contents. Only used for renaming at the moment.
240 re-uploading its contents. Only used for renaming at the moment.
238 """
241 """
239 path = path.strip('/')
242 path = path.strip('/')
240 new_path = model.get('path', path).strip('/')
243 new_path = model.get('path', path).strip('/')
241 if path != new_path:
244 if path != new_path:
242 self.rename(path, new_path)
245 self.rename(path, new_path)
243 model = self.get(new_path, content=False)
246 model = self.get(new_path, content=False)
244 return model
247 return model
245
248
246 def info_string(self):
249 def info_string(self):
247 return "Serving contents"
250 return "Serving contents"
248
251
249 def get_kernel_path(self, path, model=None):
252 def get_kernel_path(self, path, model=None):
250 """Return the API path for the kernel
253 """Return the API path for the kernel
251
254
252 KernelManagers can turn this value into a filesystem path,
255 KernelManagers can turn this value into a filesystem path,
253 or ignore it altogether.
256 or ignore it altogether.
254
257
255 The default value here will start kernels in the directory of the
258 The default value here will start kernels in the directory of the
256 notebook server. FileContentsManager overrides this to use the
259 notebook server. FileContentsManager overrides this to use the
257 directory containing the notebook.
260 directory containing the notebook.
258 """
261 """
259 return ''
262 return ''
260
263
261 def increment_filename(self, filename, path='', insert=''):
264 def increment_filename(self, filename, path='', insert=''):
262 """Increment a filename until it is unique.
265 """Increment a filename until it is unique.
263
266
264 Parameters
267 Parameters
265 ----------
268 ----------
266 filename : unicode
269 filename : unicode
267 The name of a file, including extension
270 The name of a file, including extension
268 path : unicode
271 path : unicode
269 The API path of the target's directory
272 The API path of the target's directory
270
273
271 Returns
274 Returns
272 -------
275 -------
273 name : unicode
276 name : unicode
274 A filename that is unique, based on the input filename.
277 A filename that is unique, based on the input filename.
275 """
278 """
276 path = path.strip('/')
279 path = path.strip('/')
277 basename, ext = os.path.splitext(filename)
280 basename, ext = os.path.splitext(filename)
278 for i in itertools.count():
281 for i in itertools.count():
279 if i:
282 if i:
280 insert_i = '{}{}'.format(insert, i)
283 insert_i = '{}{}'.format(insert, i)
281 else:
284 else:
282 insert_i = ''
285 insert_i = ''
283 name = u'{basename}{insert}{ext}'.format(basename=basename,
286 name = u'{basename}{insert}{ext}'.format(basename=basename,
284 insert=insert_i, ext=ext)
287 insert=insert_i, ext=ext)
285 if not self.exists(u'{}/{}'.format(path, name)):
288 if not self.exists(u'{}/{}'.format(path, name)):
286 break
289 break
287 return name
290 return name
288
291
289 def validate_notebook_model(self, model):
292 def validate_notebook_model(self, model):
290 """Add failed-validation message to model"""
293 """Add failed-validation message to model"""
291 try:
294 try:
292 validate(model['content'])
295 validate(model['content'])
293 except ValidationError as e:
296 except ValidationError as e:
294 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
297 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
295 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
298 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
296 )
299 )
297 return model
300 return model
298
301
299 def new_untitled(self, path='', type='', ext=''):
302 def new_untitled(self, path='', type='', ext=''):
300 """Create a new untitled file or directory in path
303 """Create a new untitled file or directory in path
301
304
302 path must be a directory
305 path must be a directory
303
306
304 File extension can be specified.
307 File extension can be specified.
305
308
306 Use `new` to create files with a fully specified path (including filename).
309 Use `new` to create files with a fully specified path (including filename).
307 """
310 """
308 path = path.strip('/')
311 path = path.strip('/')
309 if not self.dir_exists(path):
312 if not self.dir_exists(path):
310 raise HTTPError(404, 'No such directory: %s' % path)
313 raise HTTPError(404, 'No such directory: %s' % path)
311
314
312 model = {}
315 model = {}
313 if type:
316 if type:
314 model['type'] = type
317 model['type'] = type
315
318
316 if ext == '.ipynb':
319 if ext == '.ipynb':
317 model.setdefault('type', 'notebook')
320 model.setdefault('type', 'notebook')
318 else:
321 else:
319 model.setdefault('type', 'file')
322 model.setdefault('type', 'file')
320
323
321 insert = ''
324 insert = ''
322 if model['type'] == 'directory':
325 if model['type'] == 'directory':
323 untitled = self.untitled_directory
326 untitled = self.untitled_directory
324 insert = ' '
327 insert = ' '
325 elif model['type'] == 'notebook':
328 elif model['type'] == 'notebook':
326 untitled = self.untitled_notebook
329 untitled = self.untitled_notebook
327 ext = '.ipynb'
330 ext = '.ipynb'
328 elif model['type'] == 'file':
331 elif model['type'] == 'file':
329 untitled = self.untitled_file
332 untitled = self.untitled_file
330 else:
333 else:
331 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
334 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
332
335
333 name = self.increment_filename(untitled + ext, path, insert=insert)
336 name = self.increment_filename(untitled + ext, path, insert=insert)
334 path = u'{0}/{1}'.format(path, name)
337 path = u'{0}/{1}'.format(path, name)
335 return self.new(model, path)
338 return self.new(model, path)
336
339
337 def new(self, model=None, path=''):
340 def new(self, model=None, path=''):
338 """Create a new file or directory and return its model with no content.
341 """Create a new file or directory and return its model with no content.
339
342
340 To create a new untitled entity in a directory, use `new_untitled`.
343 To create a new untitled entity in a directory, use `new_untitled`.
341 """
344 """
342 path = path.strip('/')
345 path = path.strip('/')
343 if model is None:
346 if model is None:
344 model = {}
347 model = {}
345
348
346 if path.endswith('.ipynb'):
349 if path.endswith('.ipynb'):
347 model.setdefault('type', 'notebook')
350 model.setdefault('type', 'notebook')
348 else:
351 else:
349 model.setdefault('type', 'file')
352 model.setdefault('type', 'file')
350
353
351 # no content, not a directory, so fill out new-file model
354 # no content, not a directory, so fill out new-file model
352 if 'content' not in model and model['type'] != 'directory':
355 if 'content' not in model and model['type'] != 'directory':
353 if model['type'] == 'notebook':
356 if model['type'] == 'notebook':
354 model['content'] = new_notebook()
357 model['content'] = new_notebook()
355 model['format'] = 'json'
358 model['format'] = 'json'
356 else:
359 else:
357 model['content'] = ''
360 model['content'] = ''
358 model['type'] = 'file'
361 model['type'] = 'file'
359 model['format'] = 'text'
362 model['format'] = 'text'
360
363
361 model = self.save(model, path)
364 model = self.save(model, path)
362 return model
365 return model
363
366
364 def copy(self, from_path, to_path=None):
367 def copy(self, from_path, to_path=None):
365 """Copy an existing file and return its new model.
368 """Copy an existing file and return its new model.
366
369
367 If to_path not specified, it will be the parent directory of from_path.
370 If to_path not specified, it will be the parent directory of from_path.
368 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
371 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
369
372
370 from_path must be a full path to a file.
373 from_path must be a full path to a file.
371 """
374 """
372 path = from_path.strip('/')
375 path = from_path.strip('/')
373 if to_path is not None:
376 if to_path is not None:
374 to_path = to_path.strip('/')
377 to_path = to_path.strip('/')
375
378
376 if '/' in path:
379 if '/' in path:
377 from_dir, from_name = path.rsplit('/', 1)
380 from_dir, from_name = path.rsplit('/', 1)
378 else:
381 else:
379 from_dir = ''
382 from_dir = ''
380 from_name = path
383 from_name = path
381
384
382 model = self.get(path)
385 model = self.get(path)
383 model.pop('path', None)
386 model.pop('path', None)
384 model.pop('name', None)
387 model.pop('name', None)
385 if model['type'] == 'directory':
388 if model['type'] == 'directory':
386 raise HTTPError(400, "Can't copy directories")
389 raise HTTPError(400, "Can't copy directories")
387
390
388 if to_path is None:
391 if to_path is None:
389 to_path = from_dir
392 to_path = from_dir
390 if self.dir_exists(to_path):
393 if self.dir_exists(to_path):
391 name = copy_pat.sub(u'.', from_name)
394 name = copy_pat.sub(u'.', from_name)
392 to_name = self.increment_filename(name, to_path, insert='-Copy')
395 to_name = self.increment_filename(name, to_path, insert='-Copy')
393 to_path = u'{0}/{1}'.format(to_path, to_name)
396 to_path = u'{0}/{1}'.format(to_path, to_name)
394
397
395 model = self.save(model, to_path)
398 model = self.save(model, to_path)
396 return model
399 return model
397
400
398 def log_info(self):
401 def log_info(self):
399 self.log.info(self.info_string())
402 self.log.info(self.info_string())
400
403
401 def trust_notebook(self, path):
404 def trust_notebook(self, path):
402 """Explicitly trust a notebook
405 """Explicitly trust a notebook
403
406
404 Parameters
407 Parameters
405 ----------
408 ----------
406 path : string
409 path : string
407 The path of a notebook
410 The path of a notebook
408 """
411 """
409 model = self.get(path)
412 model = self.get(path)
410 nb = model['content']
413 nb = model['content']
411 self.log.warn("Trusting notebook %s", path)
414 self.log.warn("Trusting notebook %s", path)
412 self.notary.mark_cells(nb, True)
415 self.notary.mark_cells(nb, True)
413 self.save(model, path)
416 self.save(model, path)
414
417
415 def check_and_sign(self, nb, path=''):
418 def check_and_sign(self, nb, path=''):
416 """Check for trusted cells, and sign the notebook.
419 """Check for trusted cells, and sign the notebook.
417
420
418 Called as a part of saving notebooks.
421 Called as a part of saving notebooks.
419
422
420 Parameters
423 Parameters
421 ----------
424 ----------
422 nb : dict
425 nb : dict
423 The notebook dict
426 The notebook dict
424 path : string
427 path : string
425 The notebook's path (for logging)
428 The notebook's path (for logging)
426 """
429 """
427 if self.notary.check_cells(nb):
430 if self.notary.check_cells(nb):
428 self.notary.sign(nb)
431 self.notary.sign(nb)
429 else:
432 else:
430 self.log.warn("Saving untrusted notebook %s", path)
433 self.log.warn("Saving untrusted notebook %s", path)
431
434
432 def mark_trusted_cells(self, nb, path=''):
435 def mark_trusted_cells(self, nb, path=''):
433 """Mark cells as trusted if the notebook signature matches.
436 """Mark cells as trusted if the notebook signature matches.
434
437
435 Called as a part of loading notebooks.
438 Called as a part of loading notebooks.
436
439
437 Parameters
440 Parameters
438 ----------
441 ----------
439 nb : dict
442 nb : dict
440 The notebook object (in current nbformat)
443 The notebook object (in current nbformat)
441 path : string
444 path : string
442 The notebook's path (for logging)
445 The notebook's path (for logging)
443 """
446 """
444 trusted = self.notary.check_signature(nb)
447 trusted = self.notary.check_signature(nb)
445 if not trusted:
448 if not trusted:
446 self.log.warn("Notebook %s is not trusted", path)
449 self.log.warn("Notebook %s is not trusted", path)
447 self.notary.mark_cells(nb, trusted)
450 self.notary.mark_cells(nb, trusted)
448
451
449 def should_list(self, name):
452 def should_list(self, name):
450 """Should this file/directory name be displayed in a listing?"""
453 """Should this file/directory name be displayed in a listing?"""
451 return not any(fnmatch(name, glob) for glob in self.hide_globs)
454 return not any(fnmatch(name, glob) for glob in self.hide_globs)
452
455
453 # Part 3: Checkpoints API
456 # Part 3: Checkpoints API
454 def create_checkpoint(self, path):
457 def create_checkpoint(self, path):
455 """Create a checkpoint."""
458 """Create a checkpoint."""
456 return self.checkpoints.create_checkpoint(self, path)
459 return self.checkpoints.create_checkpoint(self, path)
457
460
458 def restore_checkpoint(self, checkpoint_id, path):
461 def restore_checkpoint(self, checkpoint_id, path):
459 """
462 """
460 Restore a checkpoint.
463 Restore a checkpoint.
461 """
464 """
462 self.checkpoints.restore_checkpoint(self, checkpoint_id, path)
465 self.checkpoints.restore_checkpoint(self, checkpoint_id, path)
463
466
464 def list_checkpoints(self, path):
467 def list_checkpoints(self, path):
465 return self.checkpoints.list_checkpoints(path)
468 return self.checkpoints.list_checkpoints(path)
466
469
467 def delete_checkpoint(self, checkpoint_id, path):
470 def delete_checkpoint(self, checkpoint_id, path):
468 return self.checkpoints.delete_checkpoint(checkpoint_id, path)
471 return self.checkpoints.delete_checkpoint(checkpoint_id, path)
@@ -1,497 +1,503 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import os
5 import os
6 import sys
6 import sys
7 import time
7 import time
8 from contextlib import contextmanager
8 from contextlib import contextmanager
9
9
10 from nose import SkipTest
10 from nose import SkipTest
11 from tornado.web import HTTPError
11 from tornado.web import HTTPError
12 from unittest import TestCase
12 from unittest import TestCase
13 from tempfile import NamedTemporaryFile
13 from tempfile import NamedTemporaryFile
14
14
15 from IPython.nbformat import v4 as nbformat
15 from IPython.nbformat import v4 as nbformat
16
16
17 from IPython.utils.tempdir import TemporaryDirectory
17 from IPython.utils.tempdir import TemporaryDirectory
18 from IPython.utils.traitlets import TraitError
18 from IPython.utils.traitlets import TraitError
19 from IPython.testing import decorators as dec
19 from IPython.testing import decorators as dec
20
20
21 from ..filemanager import FileContentsManager
21 from ..filemanager import FileContentsManager
22
22
23
23
24 def _make_dir(contents_manager, api_path):
24 def _make_dir(contents_manager, api_path):
25 """
25 """
26 Make a directory.
26 Make a directory.
27 """
27 """
28 os_path = contents_manager._get_os_path(api_path)
28 os_path = contents_manager._get_os_path(api_path)
29 try:
29 try:
30 os.makedirs(os_path)
30 os.makedirs(os_path)
31 except OSError:
31 except OSError:
32 print("Directory already exists: %r" % os_path)
32 print("Directory already exists: %r" % os_path)
33
33
34
34
35 class TestFileContentsManager(TestCase):
35 class TestFileContentsManager(TestCase):
36
36
37 @contextmanager
37 @contextmanager
38 def assertRaisesHTTPError(self, status, msg=None):
38 def assertRaisesHTTPError(self, status, msg=None):
39 msg = msg or "Should have raised HTTPError(%i)" % status
39 msg = msg or "Should have raised HTTPError(%i)" % status
40 try:
40 try:
41 yield
41 yield
42 except HTTPError as e:
42 except HTTPError as e:
43 self.assertEqual(e.status_code, status)
43 self.assertEqual(e.status_code, status)
44 else:
44 else:
45 self.fail(msg)
45 self.fail(msg)
46
46
47 def symlink(self, contents_manager, src, dst):
47 def symlink(self, contents_manager, src, dst):
48 """Make a symlink to src from dst
48 """Make a symlink to src from dst
49
49
50 src and dst are api_paths
50 src and dst are api_paths
51 """
51 """
52 src_os_path = contents_manager._get_os_path(src)
52 src_os_path = contents_manager._get_os_path(src)
53 dst_os_path = contents_manager._get_os_path(dst)
53 dst_os_path = contents_manager._get_os_path(dst)
54 print(src_os_path, dst_os_path, os.path.isfile(src_os_path))
54 print(src_os_path, dst_os_path, os.path.isfile(src_os_path))
55 os.symlink(src_os_path, dst_os_path)
55 os.symlink(src_os_path, dst_os_path)
56
56
57 def test_root_dir(self):
57 def test_root_dir(self):
58 with TemporaryDirectory() as td:
58 with TemporaryDirectory() as td:
59 fm = FileContentsManager(root_dir=td)
59 fm = FileContentsManager(root_dir=td)
60 self.assertEqual(fm.root_dir, td)
60 self.assertEqual(fm.root_dir, td)
61
61
62 def test_missing_root_dir(self):
62 def test_missing_root_dir(self):
63 with TemporaryDirectory() as td:
63 with TemporaryDirectory() as td:
64 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
64 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
65 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
65 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
66
66
67 def test_invalid_root_dir(self):
67 def test_invalid_root_dir(self):
68 with NamedTemporaryFile() as tf:
68 with NamedTemporaryFile() as tf:
69 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
69 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
70
70
71 def test_get_os_path(self):
71 def test_get_os_path(self):
72 # full filesystem path should be returned with correct operating system
72 # full filesystem path should be returned with correct operating system
73 # separators.
73 # separators.
74 with TemporaryDirectory() as td:
74 with TemporaryDirectory() as td:
75 root = td
75 root = td
76 fm = FileContentsManager(root_dir=root)
76 fm = FileContentsManager(root_dir=root)
77 path = fm._get_os_path('/path/to/notebook/test.ipynb')
77 path = fm._get_os_path('/path/to/notebook/test.ipynb')
78 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
78 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
79 fs_path = os.path.join(fm.root_dir, *rel_path_list)
79 fs_path = os.path.join(fm.root_dir, *rel_path_list)
80 self.assertEqual(path, fs_path)
80 self.assertEqual(path, fs_path)
81
81
82 fm = FileContentsManager(root_dir=root)
82 fm = FileContentsManager(root_dir=root)
83 path = fm._get_os_path('test.ipynb')
83 path = fm._get_os_path('test.ipynb')
84 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
84 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
85 self.assertEqual(path, fs_path)
85 self.assertEqual(path, fs_path)
86
86
87 fm = FileContentsManager(root_dir=root)
87 fm = FileContentsManager(root_dir=root)
88 path = fm._get_os_path('////test.ipynb')
88 path = fm._get_os_path('////test.ipynb')
89 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
89 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
90 self.assertEqual(path, fs_path)
90 self.assertEqual(path, fs_path)
91
91
92 def test_checkpoint_subdir(self):
92 def test_checkpoint_subdir(self):
93 subd = u'sub βˆ‚ir'
93 subd = u'sub βˆ‚ir'
94 cp_name = 'test-cp.ipynb'
94 cp_name = 'test-cp.ipynb'
95 with TemporaryDirectory() as td:
95 with TemporaryDirectory() as td:
96 root = td
96 root = td
97 os.mkdir(os.path.join(td, subd))
97 os.mkdir(os.path.join(td, subd))
98 fm = FileContentsManager(root_dir=root)
98 fm = FileContentsManager(root_dir=root)
99 cpm = fm.checkpoints
99 cpm = fm.checkpoints
100 cp_dir = cpm.checkpoint_path(
100 cp_dir = cpm.checkpoint_path(
101 'cp', 'test.ipynb'
101 'cp', 'test.ipynb'
102 )
102 )
103 cp_subdir = cpm.checkpoint_path(
103 cp_subdir = cpm.checkpoint_path(
104 'cp', '/%s/test.ipynb' % subd
104 'cp', '/%s/test.ipynb' % subd
105 )
105 )
106 self.assertNotEqual(cp_dir, cp_subdir)
106 self.assertNotEqual(cp_dir, cp_subdir)
107 self.assertEqual(cp_dir, os.path.join(root, cpm.checkpoint_dir, cp_name))
107 self.assertEqual(cp_dir, os.path.join(root, cpm.checkpoint_dir, cp_name))
108 self.assertEqual(cp_subdir, os.path.join(root, subd, cpm.checkpoint_dir, cp_name))
108 self.assertEqual(cp_subdir, os.path.join(root, subd, cpm.checkpoint_dir, cp_name))
109
109
110 @dec.skip_win32
110 @dec.skip_win32
111 def test_bad_symlink(self):
111 def test_bad_symlink(self):
112 with TemporaryDirectory() as td:
112 with TemporaryDirectory() as td:
113 cm = FileContentsManager(root_dir=td)
113 cm = FileContentsManager(root_dir=td)
114 path = 'test bad symlink'
114 path = 'test bad symlink'
115 _make_dir(cm, path)
115 _make_dir(cm, path)
116
116
117 file_model = cm.new_untitled(path=path, ext='.txt')
117 file_model = cm.new_untitled(path=path, ext='.txt')
118
118
119 # create a broken symlink
119 # create a broken symlink
120 self.symlink(cm, "target", '%s/%s' % (path, 'bad symlink'))
120 self.symlink(cm, "target", '%s/%s' % (path, 'bad symlink'))
121 model = cm.get(path)
121 model = cm.get(path)
122 self.assertEqual(model['content'], [file_model])
122 self.assertEqual(model['content'], [file_model])
123
123
124 @dec.skip_win32
124 @dec.skip_win32
125 def test_good_symlink(self):
125 def test_good_symlink(self):
126 with TemporaryDirectory() as td:
126 with TemporaryDirectory() as td:
127 cm = FileContentsManager(root_dir=td)
127 cm = FileContentsManager(root_dir=td)
128 parent = 'test good symlink'
128 parent = 'test good symlink'
129 name = 'good symlink'
129 name = 'good symlink'
130 path = '{0}/{1}'.format(parent, name)
130 path = '{0}/{1}'.format(parent, name)
131 _make_dir(cm, parent)
131 _make_dir(cm, parent)
132
132
133 file_model = cm.new(path=parent + '/zfoo.txt')
133 file_model = cm.new(path=parent + '/zfoo.txt')
134
134
135 # create a good symlink
135 # create a good symlink
136 self.symlink(cm, file_model['path'], path)
136 self.symlink(cm, file_model['path'], path)
137 symlink_model = cm.get(path, content=False)
137 symlink_model = cm.get(path, content=False)
138 dir_model = cm.get(parent)
138 dir_model = cm.get(parent)
139 self.assertEqual(
139 self.assertEqual(
140 sorted(dir_model['content'], key=lambda x: x['name']),
140 sorted(dir_model['content'], key=lambda x: x['name']),
141 [symlink_model, file_model],
141 [symlink_model, file_model],
142 )
142 )
143
143
144 def test_403(self):
144 def test_403(self):
145 if hasattr(os, 'getuid'):
145 if hasattr(os, 'getuid'):
146 if os.getuid() == 0:
146 if os.getuid() == 0:
147 raise SkipTest("Can't test permissions as root")
147 raise SkipTest("Can't test permissions as root")
148 if sys.platform.startswith('win'):
148 if sys.platform.startswith('win'):
149 raise SkipTest("Can't test permissions on Windows")
149 raise SkipTest("Can't test permissions on Windows")
150
150
151 with TemporaryDirectory() as td:
151 with TemporaryDirectory() as td:
152 cm = FileContentsManager(root_dir=td)
152 cm = FileContentsManager(root_dir=td)
153 model = cm.new_untitled(type='file')
153 model = cm.new_untitled(type='file')
154 os_path = cm._get_os_path(model['path'])
154 os_path = cm._get_os_path(model['path'])
155
155
156 os.chmod(os_path, 0o400)
156 os.chmod(os_path, 0o400)
157 try:
157 try:
158 with cm.open(os_path, 'w') as f:
158 with cm.open(os_path, 'w') as f:
159 f.write(u"don't care")
159 f.write(u"don't care")
160 except HTTPError as e:
160 except HTTPError as e:
161 self.assertEqual(e.status_code, 403)
161 self.assertEqual(e.status_code, 403)
162 else:
162 else:
163 self.fail("Should have raised HTTPError(403)")
163 self.fail("Should have raised HTTPError(403)")
164
164
165 def test_escape_root(self):
165 def test_escape_root(self):
166 with TemporaryDirectory() as td:
166 with TemporaryDirectory() as td:
167 cm = FileContentsManager(root_dir=td)
167 cm = FileContentsManager(root_dir=td)
168 # make foo, bar next to root
168 # make foo, bar next to root
169 with open(os.path.join(cm.root_dir, '..', 'foo'), 'w') as f:
169 with open(os.path.join(cm.root_dir, '..', 'foo'), 'w') as f:
170 f.write('foo')
170 f.write('foo')
171 with open(os.path.join(cm.root_dir, '..', 'bar'), 'w') as f:
171 with open(os.path.join(cm.root_dir, '..', 'bar'), 'w') as f:
172 f.write('bar')
172 f.write('bar')
173
173
174 with self.assertRaisesHTTPError(404):
174 with self.assertRaisesHTTPError(404):
175 cm.get('..')
175 cm.get('..')
176 with self.assertRaisesHTTPError(404):
176 with self.assertRaisesHTTPError(404):
177 cm.get('foo/../../../bar')
177 cm.get('foo/../../../bar')
178 with self.assertRaisesHTTPError(404):
178 with self.assertRaisesHTTPError(404):
179 cm.delete('../foo')
179 cm.delete('../foo')
180 with self.assertRaisesHTTPError(404):
180 with self.assertRaisesHTTPError(404):
181 cm.rename('../foo', '../bar')
181 cm.rename('../foo', '../bar')
182 with self.assertRaisesHTTPError(404):
182 with self.assertRaisesHTTPError(404):
183 cm.save(model={
183 cm.save(model={
184 'type': 'file',
184 'type': 'file',
185 'content': u'',
185 'content': u'',
186 'format': 'text',
186 'format': 'text',
187 }, path='../foo')
187 }, path='../foo')
188
188
189
189
190 class TestContentsManager(TestCase):
190 class TestContentsManager(TestCase):
191
191
192 def setUp(self):
192 def setUp(self):
193 self._temp_dir = TemporaryDirectory()
193 self._temp_dir = TemporaryDirectory()
194 self.td = self._temp_dir.name
194 self.td = self._temp_dir.name
195 self.contents_manager = FileContentsManager(
195 self.contents_manager = FileContentsManager(
196 root_dir=self.td,
196 root_dir=self.td,
197 )
197 )
198
198
199 def tearDown(self):
199 def tearDown(self):
200 self._temp_dir.cleanup()
200 self._temp_dir.cleanup()
201
201
202 def make_dir(self, api_path):
202 def make_dir(self, api_path):
203 """make a subdirectory at api_path
203 """make a subdirectory at api_path
204
204
205 override in subclasses if contents are not on the filesystem.
205 override in subclasses if contents are not on the filesystem.
206 """
206 """
207 _make_dir(self.contents_manager, api_path)
207 _make_dir(self.contents_manager, api_path)
208
208
209 def add_code_cell(self, nb):
209 def add_code_cell(self, nb):
210 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
210 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
211 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
211 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
212 nb.cells.append(cell)
212 nb.cells.append(cell)
213
213
214 def new_notebook(self):
214 def new_notebook(self):
215 cm = self.contents_manager
215 cm = self.contents_manager
216 model = cm.new_untitled(type='notebook')
216 model = cm.new_untitled(type='notebook')
217 name = model['name']
217 name = model['name']
218 path = model['path']
218 path = model['path']
219
219
220 full_model = cm.get(path)
220 full_model = cm.get(path)
221 nb = full_model['content']
221 nb = full_model['content']
222 nb['metadata']['counter'] = int(1e6 * time.time())
222 nb['metadata']['counter'] = int(1e6 * time.time())
223 self.add_code_cell(nb)
223 self.add_code_cell(nb)
224
224
225 cm.save(full_model, path)
225 cm.save(full_model, path)
226 return nb, name, path
226 return nb, name, path
227
227
228 def test_new_untitled(self):
228 def test_new_untitled(self):
229 cm = self.contents_manager
229 cm = self.contents_manager
230 # Test in root directory
230 # Test in root directory
231 model = cm.new_untitled(type='notebook')
231 model = cm.new_untitled(type='notebook')
232 assert isinstance(model, dict)
232 assert isinstance(model, dict)
233 self.assertIn('name', model)
233 self.assertIn('name', model)
234 self.assertIn('path', model)
234 self.assertIn('path', model)
235 self.assertIn('type', model)
235 self.assertIn('type', model)
236 self.assertEqual(model['type'], 'notebook')
236 self.assertEqual(model['type'], 'notebook')
237 self.assertEqual(model['name'], 'Untitled.ipynb')
237 self.assertEqual(model['name'], 'Untitled.ipynb')
238 self.assertEqual(model['path'], 'Untitled.ipynb')
238 self.assertEqual(model['path'], 'Untitled.ipynb')
239
239
240 # Test in sub-directory
240 # Test in sub-directory
241 model = cm.new_untitled(type='directory')
241 model = cm.new_untitled(type='directory')
242 assert isinstance(model, dict)
242 assert isinstance(model, dict)
243 self.assertIn('name', model)
243 self.assertIn('name', model)
244 self.assertIn('path', model)
244 self.assertIn('path', model)
245 self.assertIn('type', model)
245 self.assertIn('type', model)
246 self.assertEqual(model['type'], 'directory')
246 self.assertEqual(model['type'], 'directory')
247 self.assertEqual(model['name'], 'Untitled Folder')
247 self.assertEqual(model['name'], 'Untitled Folder')
248 self.assertEqual(model['path'], 'Untitled Folder')
248 self.assertEqual(model['path'], 'Untitled Folder')
249 sub_dir = model['path']
249 sub_dir = model['path']
250
250
251 model = cm.new_untitled(path=sub_dir)
251 model = cm.new_untitled(path=sub_dir)
252 assert isinstance(model, dict)
252 assert isinstance(model, dict)
253 self.assertIn('name', model)
253 self.assertIn('name', model)
254 self.assertIn('path', model)
254 self.assertIn('path', model)
255 self.assertIn('type', model)
255 self.assertIn('type', model)
256 self.assertEqual(model['type'], 'file')
256 self.assertEqual(model['type'], 'file')
257 self.assertEqual(model['name'], 'untitled')
257 self.assertEqual(model['name'], 'untitled')
258 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
258 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
259
259
260 def test_get(self):
260 def test_get(self):
261 cm = self.contents_manager
261 cm = self.contents_manager
262 # Create a notebook
262 # Create a notebook
263 model = cm.new_untitled(type='notebook')
263 model = cm.new_untitled(type='notebook')
264 name = model['name']
264 name = model['name']
265 path = model['path']
265 path = model['path']
266
266
267 # Check that we 'get' on the notebook we just created
267 # Check that we 'get' on the notebook we just created
268 model2 = cm.get(path)
268 model2 = cm.get(path)
269 assert isinstance(model2, dict)
269 assert isinstance(model2, dict)
270 self.assertIn('name', model2)
270 self.assertIn('name', model2)
271 self.assertIn('path', model2)
271 self.assertIn('path', model2)
272 self.assertEqual(model['name'], name)
272 self.assertEqual(model['name'], name)
273 self.assertEqual(model['path'], path)
273 self.assertEqual(model['path'], path)
274
274
275 nb_as_file = cm.get(path, content=True, type='file')
275 nb_as_file = cm.get(path, content=True, type='file')
276 self.assertEqual(nb_as_file['path'], path)
276 self.assertEqual(nb_as_file['path'], path)
277 self.assertEqual(nb_as_file['type'], 'file')
277 self.assertEqual(nb_as_file['type'], 'file')
278 self.assertEqual(nb_as_file['format'], 'text')
278 self.assertEqual(nb_as_file['format'], 'text')
279 self.assertNotIsInstance(nb_as_file['content'], dict)
279 self.assertNotIsInstance(nb_as_file['content'], dict)
280
280
281 nb_as_bin_file = cm.get(path, content=True, type='file', format='base64')
281 nb_as_bin_file = cm.get(path, content=True, type='file', format='base64')
282 self.assertEqual(nb_as_bin_file['format'], 'base64')
282 self.assertEqual(nb_as_bin_file['format'], 'base64')
283
283
284 # Test in sub-directory
284 # Test in sub-directory
285 sub_dir = '/foo/'
285 sub_dir = '/foo/'
286 self.make_dir('foo')
286 self.make_dir('foo')
287 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
287 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
288 model2 = cm.get(sub_dir + name)
288 model2 = cm.get(sub_dir + name)
289 assert isinstance(model2, dict)
289 assert isinstance(model2, dict)
290 self.assertIn('name', model2)
290 self.assertIn('name', model2)
291 self.assertIn('path', model2)
291 self.assertIn('path', model2)
292 self.assertIn('content', model2)
292 self.assertIn('content', model2)
293 self.assertEqual(model2['name'], 'Untitled.ipynb')
293 self.assertEqual(model2['name'], 'Untitled.ipynb')
294 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
294 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
295
295
296 # Test with a regular file.
296 # Test with a regular file.
297 file_model_path = cm.new_untitled(path=sub_dir, ext='.txt')['path']
297 file_model_path = cm.new_untitled(path=sub_dir, ext='.txt')['path']
298 file_model = cm.get(file_model_path)
298 file_model = cm.get(file_model_path)
299 self.assertDictContainsSubset(
299 self.assertDictContainsSubset(
300 {
300 {
301 'content': u'',
301 'content': u'',
302 'format': u'text',
302 'format': u'text',
303 'mimetype': u'text/plain',
303 'mimetype': u'text/plain',
304 'name': u'untitled.txt',
304 'name': u'untitled.txt',
305 'path': u'foo/untitled.txt',
305 'path': u'foo/untitled.txt',
306 'type': u'file',
306 'type': u'file',
307 'writable': True,
307 'writable': True,
308 },
308 },
309 file_model,
309 file_model,
310 )
310 )
311 self.assertIn('created', file_model)
311 self.assertIn('created', file_model)
312 self.assertIn('last_modified', file_model)
312 self.assertIn('last_modified', file_model)
313
313
314 # Test getting directory model
314 # Test getting directory model
315
315
316 # Create a sub-sub directory to test getting directory contents with a
316 # Create a sub-sub directory to test getting directory contents with a
317 # subdir.
317 # subdir.
318 self.make_dir('foo/bar')
318 self.make_dir('foo/bar')
319 dirmodel = cm.get('foo')
319 dirmodel = cm.get('foo')
320 self.assertEqual(dirmodel['type'], 'directory')
320 self.assertEqual(dirmodel['type'], 'directory')
321 self.assertIsInstance(dirmodel['content'], list)
321 self.assertIsInstance(dirmodel['content'], list)
322 self.assertEqual(len(dirmodel['content']), 3)
322 self.assertEqual(len(dirmodel['content']), 3)
323 self.assertEqual(dirmodel['path'], 'foo')
323 self.assertEqual(dirmodel['path'], 'foo')
324 self.assertEqual(dirmodel['name'], 'foo')
324 self.assertEqual(dirmodel['name'], 'foo')
325
325
326 # Directory contents should match the contents of each individual entry
326 # Directory contents should match the contents of each individual entry
327 # when requested with content=False.
327 # when requested with content=False.
328 model2_no_content = cm.get(sub_dir + name, content=False)
328 model2_no_content = cm.get(sub_dir + name, content=False)
329 file_model_no_content = cm.get(u'foo/untitled.txt', content=False)
329 file_model_no_content = cm.get(u'foo/untitled.txt', content=False)
330 sub_sub_dir_no_content = cm.get('foo/bar', content=False)
330 sub_sub_dir_no_content = cm.get('foo/bar', content=False)
331 self.assertEqual(sub_sub_dir_no_content['path'], 'foo/bar')
331 self.assertEqual(sub_sub_dir_no_content['path'], 'foo/bar')
332 self.assertEqual(sub_sub_dir_no_content['name'], 'bar')
332 self.assertEqual(sub_sub_dir_no_content['name'], 'bar')
333
333
334 for entry in dirmodel['content']:
334 for entry in dirmodel['content']:
335 # Order isn't guaranteed by the spec, so this is a hacky way of
335 # Order isn't guaranteed by the spec, so this is a hacky way of
336 # verifying that all entries are matched.
336 # verifying that all entries are matched.
337 if entry['path'] == sub_sub_dir_no_content['path']:
337 if entry['path'] == sub_sub_dir_no_content['path']:
338 self.assertEqual(entry, sub_sub_dir_no_content)
338 self.assertEqual(entry, sub_sub_dir_no_content)
339 elif entry['path'] == model2_no_content['path']:
339 elif entry['path'] == model2_no_content['path']:
340 self.assertEqual(entry, model2_no_content)
340 self.assertEqual(entry, model2_no_content)
341 elif entry['path'] == file_model_no_content['path']:
341 elif entry['path'] == file_model_no_content['path']:
342 self.assertEqual(entry, file_model_no_content)
342 self.assertEqual(entry, file_model_no_content)
343 else:
343 else:
344 self.fail("Unexpected directory entry: %s" % entry())
344 self.fail("Unexpected directory entry: %s" % entry())
345
345
346 with self.assertRaises(HTTPError):
346 with self.assertRaises(HTTPError):
347 cm.get('foo', type='file')
347 cm.get('foo', type='file')
348
348
349 def test_update(self):
349 def test_update(self):
350 cm = self.contents_manager
350 cm = self.contents_manager
351 # Create a notebook
351 # Create a notebook
352 model = cm.new_untitled(type='notebook')
352 model = cm.new_untitled(type='notebook')
353 name = model['name']
353 name = model['name']
354 path = model['path']
354 path = model['path']
355
355
356 # Change the name in the model for rename
356 # Change the name in the model for rename
357 model['path'] = 'test.ipynb'
357 model['path'] = 'test.ipynb'
358 model = cm.update(model, path)
358 model = cm.update(model, path)
359 assert isinstance(model, dict)
359 assert isinstance(model, dict)
360 self.assertIn('name', model)
360 self.assertIn('name', model)
361 self.assertIn('path', model)
361 self.assertIn('path', model)
362 self.assertEqual(model['name'], 'test.ipynb')
362 self.assertEqual(model['name'], 'test.ipynb')
363
363
364 # Make sure the old name is gone
364 # Make sure the old name is gone
365 self.assertRaises(HTTPError, cm.get, path)
365 self.assertRaises(HTTPError, cm.get, path)
366
366
367 # Test in sub-directory
367 # Test in sub-directory
368 # Create a directory and notebook in that directory
368 # Create a directory and notebook in that directory
369 sub_dir = '/foo/'
369 sub_dir = '/foo/'
370 self.make_dir('foo')
370 self.make_dir('foo')
371 model = cm.new_untitled(path=sub_dir, type='notebook')
371 model = cm.new_untitled(path=sub_dir, type='notebook')
372 path = model['path']
372 path = model['path']
373
373
374 # Change the name in the model for rename
374 # Change the name in the model for rename
375 d = path.rsplit('/', 1)[0]
375 d = path.rsplit('/', 1)[0]
376 new_path = model['path'] = d + '/test_in_sub.ipynb'
376 new_path = model['path'] = d + '/test_in_sub.ipynb'
377 model = cm.update(model, path)
377 model = cm.update(model, path)
378 assert isinstance(model, dict)
378 assert isinstance(model, dict)
379 self.assertIn('name', model)
379 self.assertIn('name', model)
380 self.assertIn('path', model)
380 self.assertIn('path', model)
381 self.assertEqual(model['name'], 'test_in_sub.ipynb')
381 self.assertEqual(model['name'], 'test_in_sub.ipynb')
382 self.assertEqual(model['path'], new_path)
382 self.assertEqual(model['path'], new_path)
383
383
384 # Make sure the old name is gone
384 # Make sure the old name is gone
385 self.assertRaises(HTTPError, cm.get, path)
385 self.assertRaises(HTTPError, cm.get, path)
386
386
387 def test_save(self):
387 def test_save(self):
388 cm = self.contents_manager
388 cm = self.contents_manager
389 # Create a notebook
389 # Create a notebook
390 model = cm.new_untitled(type='notebook')
390 model = cm.new_untitled(type='notebook')
391 name = model['name']
391 name = model['name']
392 path = model['path']
392 path = model['path']
393
393
394 # Get the model with 'content'
394 # Get the model with 'content'
395 full_model = cm.get(path)
395 full_model = cm.get(path)
396
396
397 # Save the notebook
397 # Save the notebook
398 model = cm.save(full_model, path)
398 model = cm.save(full_model, path)
399 assert isinstance(model, dict)
399 assert isinstance(model, dict)
400 self.assertIn('name', model)
400 self.assertIn('name', model)
401 self.assertIn('path', model)
401 self.assertIn('path', model)
402 self.assertEqual(model['name'], name)
402 self.assertEqual(model['name'], name)
403 self.assertEqual(model['path'], path)
403 self.assertEqual(model['path'], path)
404
404
405 # Test in sub-directory
405 # Test in sub-directory
406 # Create a directory and notebook in that directory
406 # Create a directory and notebook in that directory
407 sub_dir = '/foo/'
407 sub_dir = '/foo/'
408 self.make_dir('foo')
408 self.make_dir('foo')
409 model = cm.new_untitled(path=sub_dir, type='notebook')
409 model = cm.new_untitled(path=sub_dir, type='notebook')
410 name = model['name']
410 name = model['name']
411 path = model['path']
411 path = model['path']
412 model = cm.get(path)
412 model = cm.get(path)
413
413
414 # Change the name in the model for rename
414 # Change the name in the model for rename
415 model = cm.save(model, path)
415 model = cm.save(model, path)
416 assert isinstance(model, dict)
416 assert isinstance(model, dict)
417 self.assertIn('name', model)
417 self.assertIn('name', model)
418 self.assertIn('path', model)
418 self.assertIn('path', model)
419 self.assertEqual(model['name'], 'Untitled.ipynb')
419 self.assertEqual(model['name'], 'Untitled.ipynb')
420 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
420 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
421
421
422 def test_delete(self):
422 def test_delete(self):
423 cm = self.contents_manager
423 cm = self.contents_manager
424 # Create a notebook
424 # Create a notebook
425 nb, name, path = self.new_notebook()
425 nb, name, path = self.new_notebook()
426
426
427 # Delete the notebook
427 # Delete the notebook
428 cm.delete(path)
428 cm.delete(path)
429
429
430 # Check that deleting a non-existent path raises an error.
430 # Check that deleting a non-existent path raises an error.
431 self.assertRaises(HTTPError, cm.delete, path)
431 self.assertRaises(HTTPError, cm.delete, path)
432
432
433 # Check that a 'get' on the deleted notebook raises and error
433 # Check that a 'get' on the deleted notebook raises and error
434 self.assertRaises(HTTPError, cm.get, path)
434 self.assertRaises(HTTPError, cm.get, path)
435
435
436 def test_delete_root(self):
437 cm = self.contents_manager
438 with self.assertRaises(HTTPError) as err:
439 cm.delete('')
440 self.assertEqual(err.exception.status_code, 400)
441
436 def test_copy(self):
442 def test_copy(self):
437 cm = self.contents_manager
443 cm = self.contents_manager
438 parent = u'Γ₯ b'
444 parent = u'Γ₯ b'
439 name = u'nb √.ipynb'
445 name = u'nb √.ipynb'
440 path = u'{0}/{1}'.format(parent, name)
446 path = u'{0}/{1}'.format(parent, name)
441 self.make_dir(parent)
447 self.make_dir(parent)
442
448
443 orig = cm.new(path=path)
449 orig = cm.new(path=path)
444 # copy with unspecified name
450 # copy with unspecified name
445 copy = cm.copy(path)
451 copy = cm.copy(path)
446 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
452 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
447
453
448 # copy with specified name
454 # copy with specified name
449 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
455 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
450 self.assertEqual(copy2['name'], u'copy 2.ipynb')
456 self.assertEqual(copy2['name'], u'copy 2.ipynb')
451 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
457 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
452 # copy with specified path
458 # copy with specified path
453 copy2 = cm.copy(path, u'/')
459 copy2 = cm.copy(path, u'/')
454 self.assertEqual(copy2['name'], name)
460 self.assertEqual(copy2['name'], name)
455 self.assertEqual(copy2['path'], name)
461 self.assertEqual(copy2['path'], name)
456
462
457 def test_trust_notebook(self):
463 def test_trust_notebook(self):
458 cm = self.contents_manager
464 cm = self.contents_manager
459 nb, name, path = self.new_notebook()
465 nb, name, path = self.new_notebook()
460
466
461 untrusted = cm.get(path)['content']
467 untrusted = cm.get(path)['content']
462 assert not cm.notary.check_cells(untrusted)
468 assert not cm.notary.check_cells(untrusted)
463
469
464 # print(untrusted)
470 # print(untrusted)
465 cm.trust_notebook(path)
471 cm.trust_notebook(path)
466 trusted = cm.get(path)['content']
472 trusted = cm.get(path)['content']
467 # print(trusted)
473 # print(trusted)
468 assert cm.notary.check_cells(trusted)
474 assert cm.notary.check_cells(trusted)
469
475
470 def test_mark_trusted_cells(self):
476 def test_mark_trusted_cells(self):
471 cm = self.contents_manager
477 cm = self.contents_manager
472 nb, name, path = self.new_notebook()
478 nb, name, path = self.new_notebook()
473
479
474 cm.mark_trusted_cells(nb, path)
480 cm.mark_trusted_cells(nb, path)
475 for cell in nb.cells:
481 for cell in nb.cells:
476 if cell.cell_type == 'code':
482 if cell.cell_type == 'code':
477 assert not cell.metadata.trusted
483 assert not cell.metadata.trusted
478
484
479 cm.trust_notebook(path)
485 cm.trust_notebook(path)
480 nb = cm.get(path)['content']
486 nb = cm.get(path)['content']
481 for cell in nb.cells:
487 for cell in nb.cells:
482 if cell.cell_type == 'code':
488 if cell.cell_type == 'code':
483 assert cell.metadata.trusted
489 assert cell.metadata.trusted
484
490
485 def test_check_and_sign(self):
491 def test_check_and_sign(self):
486 cm = self.contents_manager
492 cm = self.contents_manager
487 nb, name, path = self.new_notebook()
493 nb, name, path = self.new_notebook()
488
494
489 cm.mark_trusted_cells(nb, path)
495 cm.mark_trusted_cells(nb, path)
490 cm.check_and_sign(nb, path)
496 cm.check_and_sign(nb, path)
491 assert not cm.notary.check_signature(nb)
497 assert not cm.notary.check_signature(nb)
492
498
493 cm.trust_notebook(path)
499 cm.trust_notebook(path)
494 nb = cm.get(path)['content']
500 nb = cm.get(path)['content']
495 cm.mark_trusted_cells(nb, path)
501 cm.mark_trusted_cells(nb, path)
496 cm.check_and_sign(nb, path)
502 cm.check_and_sign(nb, path)
497 assert cm.notary.check_signature(nb)
503 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now