filemanager.py
772 lines
| 26.4 KiB
| text/x-python
|
PythonLexer
|
r17524 | """A contents manager that uses the local file system for storage.""" | ||
|
r4609 | |||
|
r16486 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
|
r4484 | |||
|
r17525 | import base64 | ||
|
r19727 | from contextlib import contextmanager | ||
|
r19005 | import errno | ||
|
r6030 | import io | ||
|
r4484 | import os | ||
|
r10497 | import shutil | ||
|
r19011 | import mimetypes | ||
|
r12984 | |||
|
r4484 | from tornado import web | ||
|
r19727 | from .manager import ( | ||
CheckpointManager, | ||||
ContentsManager, | ||||
) | ||||
|
r18607 | from IPython import nbformat | ||
|
r17557 | from IPython.utils.io import atomic_writing | ||
|
r19299 | from IPython.utils.importstring import import_item | ||
|
r16486 | from IPython.utils.path import ensure_dir_exists | ||
|
r19299 | from IPython.utils.traitlets import Any, Unicode, Bool, TraitError | ||
|
r19727 | from IPython.utils.py3compat import getcwd, string_types, str_to_unicode | ||
|
r11145 | from IPython.utils import tz | ||
|
r19727 | from IPython.html.utils import ( | ||
is_hidden, | ||||
to_api_path, | ||||
to_os_path, | ||||
) | ||||
|
r4484 | |||
|
r19300 | _script_exporter = None | ||
|
r19301 | |||
|
r19300 | def _post_save_script(model, os_path, contents_manager, **kwargs): | ||
"""convert notebooks to Python script after save with nbconvert | ||||
replaces `ipython notebook --script` | ||||
""" | ||||
|
r19301 | from IPython.nbconvert.exporters.script import ScriptExporter | ||
|
r19300 | |||
if model['type'] != 'notebook': | ||||
return | ||||
|
r19301 | |||
|
r19300 | global _script_exporter | ||
if _script_exporter is None: | ||||
|
r19301 | _script_exporter = ScriptExporter(parent=contents_manager) | ||
|
r19300 | log = contents_manager.log | ||
base, ext = os.path.splitext(os_path) | ||||
py_fname = base + '.py' | ||||
|
r19301 | script, resources = _script_exporter.from_filename(os_path) | ||
script_fname = base + resources.get('output_extension', '.txt') | ||||
log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir)) | ||||
with io.open(script_fname, 'w', encoding='utf-8') as f: | ||||
f.write(script) | ||||
|
r4484 | |||
|
r17523 | |||
|
r19727 | class FileManagerMixin(object): | ||
""" | ||||
Mixin for ContentsAPI classes that interact with the filesystem. | ||||
|
r19786 | Provides facilities for reading, writing, and copying both notebooks and | ||
generic files. | ||||
Shared by FileContentsManager and FileCheckpointManager. | ||||
|
r19727 | |||
Note | ||||
---- | ||||
Classes using this mixin must provide the following attributes: | ||||
root_dir : unicode | ||||
A directory against against which API-style paths are to be resolved. | ||||
log : logging.Logger | ||||
""" | ||||
@contextmanager | ||||
def open(self, os_path, *args, **kwargs): | ||||
"""wrapper around io.open that turns permission errors into 403""" | ||||
with self.perm_to_403(os_path): | ||||
with io.open(os_path, *args, **kwargs) as f: | ||||
yield f | ||||
@contextmanager | ||||
def atomic_writing(self, os_path, *args, **kwargs): | ||||
"""wrapper around atomic_writing that turns permission errors into 403""" | ||||
with self.perm_to_403(os_path): | ||||
with atomic_writing(os_path, *args, **kwargs) as f: | ||||
yield f | ||||
|
r18970 | |||
|
r19005 | @contextmanager | ||
def perm_to_403(self, os_path=''): | ||||
|
r19727 | """context manager for turning permission errors into 403.""" | ||
|
r19005 | try: | ||
yield | ||||
except OSError as e: | ||||
if e.errno in {errno.EPERM, errno.EACCES}: | ||||
# make 403 error message without root prefix | ||||
# this may not work perfectly on unicode paths on Python 2, | ||||
# but nobody should be doing that anyway. | ||||
if not os_path: | ||||
os_path = str_to_unicode(e.filename or 'unknown file') | ||||
|
r19727 | path = to_api_path(os_path, root=self.root_dir) | ||
|
r19005 | raise web.HTTPError(403, u'Permission denied: %s' % path) | ||
else: | ||||
raise | ||||
|
r17523 | |||
|
r15827 | def _copy(self, src, dest): | ||
"""copy src to dest | ||||
|
r17523 | |||
|
r15865 | like shutil.copy2, but log errors in copystat | ||
|
r15827 | """ | ||
|
r15865 | shutil.copyfile(src, dest) | ||
try: | ||||
shutil.copystat(src, dest) | ||||
|
r19747 | except OSError: | ||
|
r15865 | self.log.debug("copystat on %s failed", dest, exc_info=True) | ||
|
r17523 | |||
|
r18758 | def _get_os_path(self, path): | ||
"""Given an API path, return its file system path. | ||||
|
r17525 | |||
Parameters | ||||
---------- | ||||
path : string | ||||
|
r17535 | The relative API path to the named file. | ||
|
r17525 | |||
Returns | ||||
------- | ||||
path : string | ||||
|
r18749 | Native, absolute OS path to for a file. | ||
|
r17525 | """ | ||
return to_os_path(path, self.root_dir) | ||||
|
r12997 | |||
|
r19786 | def _read_notebook(self, os_path, as_version=4): | ||
"""Read a notebook from an os path.""" | ||||
with self.open(os_path, 'r', encoding='utf-8') as f: | ||||
try: | ||||
return nbformat.read(f, as_version=as_version) | ||||
except Exception as e: | ||||
raise web.HTTPError( | ||||
400, | ||||
u"Unreadable Notebook: %s %r" % (os_path, e), | ||||
) | ||||
def _save_notebook(self, os_path, nb): | ||||
"""Save a notebook to an os_path.""" | ||||
with self.atomic_writing(os_path, encoding='utf-8') as f: | ||||
nbformat.write(nb, f, version=nbformat.NO_CONVERT) | ||||
def _read_file(self, os_path, format): | ||||
"""Read a non-notebook file. | ||||
os_path: The path to be read. | ||||
format: | ||||
If 'text', the contents will be decoded as UTF-8. | ||||
If 'base64', the raw bytes contents will be encoded as base64. | ||||
If not specified, try to decode as UTF-8, and fall back to base64 | ||||
""" | ||||
if not os.path.isfile(os_path): | ||||
raise web.HTTPError(400, "Cannot read non-file %s" % os_path) | ||||
with self.open(os_path, 'rb') as f: | ||||
bcontent = f.read() | ||||
if format is None or format == 'text': | ||||
# Try to interpret as unicode if format is unknown or if unicode | ||||
# was explicitly requested. | ||||
try: | ||||
return bcontent.decode('utf8'), 'text' | ||||
except UnicodeError as e: | ||||
if format == 'text': | ||||
raise web.HTTPError( | ||||
400, | ||||
"%s is not UTF-8 encoded" % os_path, | ||||
reason='bad format', | ||||
) | ||||
return base64.encodestring(bcontent).decode('ascii'), 'base64' | ||||
def _save_file(self, os_path, content, format): | ||||
"""Save content of a generic file.""" | ||||
if format not in {'text', 'base64'}: | ||||
raise web.HTTPError( | ||||
400, | ||||
"Must specify format of file contents as 'text' or 'base64'", | ||||
) | ||||
try: | ||||
if format == 'text': | ||||
bcontent = content.encode('utf8') | ||||
else: | ||||
b64_bytes = content.encode('ascii') | ||||
bcontent = base64.decodestring(b64_bytes) | ||||
except Exception as e: | ||||
raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e)) | ||||
with self.atomic_writing(os_path, text=False) as f: | ||||
f.write(bcontent) | ||||
|
r19727 | |||
class FileCheckpointManager(FileManagerMixin, CheckpointManager): | ||||
""" | ||||
A CheckpointManager that caches checkpoints for files in adjacent | ||||
directories. | ||||
""" | ||||
checkpoint_dir = Unicode( | ||||
'.ipynb_checkpoints', | ||||
config=True, | ||||
help="""The directory name in which to keep file checkpoints | ||||
This is a path relative to the file's own directory. | ||||
By default, it is .ipynb_checkpoints | ||||
""", | ||||
) | ||||
|
r19747 | root_dir = Unicode(config=True) | ||
def _root_dir_default(self): | ||||
|
r19727 | try: | ||
return self.parent.root_dir | ||||
except AttributeError: | ||||
return getcwd() | ||||
# public checkpoint API | ||||
|
r19786 | def create_file_checkpoint(self, content, format, path): | ||
|
r19747 | """Create a checkpoint from the current content of a notebook.""" | ||
|
r19727 | path = path.strip('/') | ||
# only the one checkpoint ID: | ||||
checkpoint_id = u"checkpoint" | ||||
|
r19786 | os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) | ||
self.log.debug("creating checkpoint for %s", path) | ||||
with self.perm_to_403(): | ||||
self._save_file(os_checkpoint_path, content, format=format) | ||||
# return the checkpoint info | ||||
return self.checkpoint_model(checkpoint_id, os_checkpoint_path) | ||||
def create_notebook_checkpoint(self, nb, path): | ||||
"""Create a checkpoint from the current content of a notebook.""" | ||||
path = path.strip('/') | ||||
# only the one checkpoint ID: | ||||
checkpoint_id = u"checkpoint" | ||||
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) | ||||
|
r19727 | self.log.debug("creating checkpoint for %s", path) | ||
with self.perm_to_403(): | ||||
|
r19747 | self._save_notebook(os_checkpoint_path, nb) | ||
|
r19727 | |||
# return the checkpoint info | ||||
|
r19786 | return self.checkpoint_model(checkpoint_id, os_checkpoint_path) | ||
|
r19727 | |||
|
r19786 | def get_checkpoint(self, checkpoint_id, path, type): | ||
|
r19747 | """Get the content of a checkpoint. | ||
|
r19825 | Returns a model suitable for passing to ContentsManager.save. | ||
|
r19747 | """ | ||
path = path.strip('/') | ||||
self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) | ||||
|
r19786 | os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) | ||
if not os.path.isfile(os_checkpoint_path): | ||||
self.no_such_checkpoint(path, checkpoint_id) | ||||
if type == 'notebook': | ||||
|
r19825 | return { | ||
'type': type, | ||||
'content': self._read_notebook( | ||||
os_checkpoint_path, | ||||
as_version=4, | ||||
), | ||||
} | ||||
|
r19786 | else: | ||
|
r19825 | content, format = self._read_file(os_checkpoint_path, format=None) | ||
return { | ||||
'type': type, | ||||
'content': content, | ||||
'format': format, | ||||
} | ||||
|
r19747 | |||
|
r19727 | def rename_checkpoint(self, checkpoint_id, old_path, new_path): | ||
"""Rename a checkpoint from old_path to new_path.""" | ||||
|
r19786 | old_cp_path = self.checkpoint_path(checkpoint_id, old_path) | ||
new_cp_path = self.checkpoint_path(checkpoint_id, new_path) | ||||
|
r19727 | if os.path.isfile(old_cp_path): | ||
self.log.debug( | ||||
"Renaming checkpoint %s -> %s", | ||||
old_cp_path, | ||||
new_cp_path, | ||||
) | ||||
with self.perm_to_403(): | ||||
shutil.move(old_cp_path, new_cp_path) | ||||
|
r19747 | def delete_checkpoint(self, checkpoint_id, path): | ||
"""delete a file's checkpoint""" | ||||
path = path.strip('/') | ||||
|
r19786 | cp_path = self.checkpoint_path(checkpoint_id, path) | ||
|
r19747 | if not os.path.isfile(cp_path): | ||
self.no_such_checkpoint(path, checkpoint_id) | ||||
self.log.debug("unlinking %s", cp_path) | ||||
with self.perm_to_403(): | ||||
os.unlink(cp_path) | ||||
|
r19727 | def list_checkpoints(self, path): | ||
"""list the checkpoints for a given file | ||||
This contents manager currently only supports one checkpoint per file. | ||||
""" | ||||
path = path.strip('/') | ||||
checkpoint_id = "checkpoint" | ||||
|
r19786 | os_path = self.checkpoint_path(checkpoint_id, path) | ||
if not os.path.isfile(os_path): | ||||
|
r19727 | return [] | ||
else: | ||||
|
r19786 | return [self.checkpoint_model(checkpoint_id, os_path)] | ||
|
r19727 | |||
# Checkpoint-related utilities | ||||
|
r19786 | def checkpoint_path(self, checkpoint_id, path): | ||
|
r19727 | """find the path to a checkpoint""" | ||
path = path.strip('/') | ||||
parent, name = ('/' + path).rsplit('/', 1) | ||||
parent = parent.strip('/') | ||||
basename, ext = os.path.splitext(name) | ||||
filename = u"{name}-{checkpoint_id}{ext}".format( | ||||
name=basename, | ||||
checkpoint_id=checkpoint_id, | ||||
ext=ext, | ||||
) | ||||
os_path = self._get_os_path(path=parent) | ||||
cp_dir = os.path.join(os_path, self.checkpoint_dir) | ||||
with self.perm_to_403(): | ||||
ensure_dir_exists(cp_dir) | ||||
cp_path = os.path.join(cp_dir, filename) | ||||
return cp_path | ||||
|
r19786 | def checkpoint_model(self, checkpoint_id, os_path): | ||
|
r19727 | """construct the info dict for a given checkpoint""" | ||
|
r19786 | stats = os.stat(os_path) | ||
|
r19727 | last_modified = tz.utcfromtimestamp(stats.st_mtime) | ||
info = dict( | ||||
id=checkpoint_id, | ||||
last_modified=last_modified, | ||||
) | ||||
return info | ||||
# Error Handling | ||||
def no_such_checkpoint(self, path, checkpoint_id): | ||||
raise web.HTTPError( | ||||
404, | ||||
u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id) | ||||
) | ||||
class FileContentsManager(FileManagerMixin, ContentsManager): | ||||
root_dir = Unicode(config=True) | ||||
def _root_dir_default(self): | ||||
try: | ||||
return self.parent.notebook_dir | ||||
except AttributeError: | ||||
return getcwd() | ||||
save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook') | ||||
def _save_script_changed(self): | ||||
self.log.warn(""" | ||||
`--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks: | ||||
ContentsManager.pre_save_hook | ||||
FileContentsManager.post_save_hook | ||||
A post-save hook has been registered that calls: | ||||
ipython nbconvert --to script [notebook] | ||||
which behaves similarly to `--script`. | ||||
""") | ||||
self.post_save_hook = _post_save_script | ||||
post_save_hook = Any(None, config=True, | ||||
help="""Python callable or importstring thereof | ||||
to be called on the path of a file just saved. | ||||
This can be used to process the file on disk, | ||||
such as converting the notebook to a script or HTML via nbconvert. | ||||
It will be called as (all arguments passed by keyword): | ||||
hook(os_path=os_path, model=model, contents_manager=instance) | ||||
path: the filesystem path to the file just written | ||||
model: the model representing the file | ||||
contents_manager: this ContentsManager instance | ||||
""" | ||||
) | ||||
def _post_save_hook_changed(self, name, old, new): | ||||
if new and isinstance(new, string_types): | ||||
self.post_save_hook = import_item(self.post_save_hook) | ||||
elif new: | ||||
if not callable(new): | ||||
raise TraitError("post_save_hook must be callable") | ||||
def run_post_save_hook(self, model, os_path): | ||||
"""Run the post-save hook if defined, and log errors""" | ||||
if self.post_save_hook: | ||||
try: | ||||
self.log.debug("Running post-save hook on %s", os_path) | ||||
self.post_save_hook(os_path=os_path, model=model, contents_manager=self) | ||||
except Exception: | ||||
self.log.error("Post-save hook failed on %s", os_path, exc_info=True) | ||||
def _root_dir_changed(self, name, old, new): | ||||
"""Do a bit of validation of the root_dir.""" | ||||
if not os.path.isabs(new): | ||||
# If we receive a non-absolute path, make it absolute. | ||||
self.root_dir = os.path.abspath(new) | ||||
return | ||||
if not os.path.isdir(new): | ||||
raise TraitError("%r is not a directory" % new) | ||||
def _checkpoint_manager_class_default(self): | ||||
return FileCheckpointManager | ||||
|
r19747 | def is_hidden(self, path): | ||
"""Does the API style path correspond to a hidden directory or file? | ||||
Parameters | ||||
---------- | ||||
path : string | ||||
The path to check. This is an API path (`/` separated, | ||||
relative to root_dir). | ||||
Returns | ||||
------- | ||||
hidden : bool | ||||
Whether the path exists and is hidden. | ||||
""" | ||||
path = path.strip('/') | ||||
os_path = self._get_os_path(path=path) | ||||
return is_hidden(os_path, self.root_dir) | ||||
def file_exists(self, path): | ||||
"""Returns True if the file exists, else returns False. | ||||
API-style wrapper for os.path.isfile | ||||
Parameters | ||||
---------- | ||||
path : string | ||||
The relative path to the file (with '/' as separator) | ||||
Returns | ||||
------- | ||||
exists : bool | ||||
Whether the file exists. | ||||
""" | ||||
path = path.strip('/') | ||||
os_path = self._get_os_path(path) | ||||
return os.path.isfile(os_path) | ||||
def dir_exists(self, path): | ||||
"""Does the API-style path refer to an extant directory? | ||||
API-style wrapper for os.path.isdir | ||||
Parameters | ||||
---------- | ||||
path : string | ||||
The path to check. This is an API path (`/` separated, | ||||
relative to root_dir). | ||||
Returns | ||||
------- | ||||
exists : bool | ||||
Whether the path is indeed a directory. | ||||
""" | ||||
path = path.strip('/') | ||||
os_path = self._get_os_path(path=path) | ||||
return os.path.isdir(os_path) | ||||
def exists(self, path): | ||||
"""Returns True if the path exists, else returns False. | ||||
API-style wrapper for os.path.exists | ||||
Parameters | ||||
---------- | ||||
path : string | ||||
The API path to the file (with '/' as separator) | ||||
Returns | ||||
------- | ||||
exists : bool | ||||
Whether the target exists. | ||||
""" | ||||
path = path.strip('/') | ||||
os_path = self._get_os_path(path=path) | ||||
return os.path.exists(os_path) | ||||
|
r18749 | def _base_model(self, path): | ||
|
r17525 | """Build the common base of a contents model""" | ||
|
r18749 | os_path = self._get_os_path(path) | ||
|
r15069 | info = os.stat(os_path) | ||
last_modified = tz.utcfromtimestamp(info.st_mtime) | ||||
created = tz.utcfromtimestamp(info.st_ctime) | ||||
|
r17529 | # Create the base model. | ||
|
r17525 | model = {} | ||
|
r18749 | model['name'] = path.rsplit('/', 1)[-1] | ||
|
r15069 | model['path'] = path | ||
model['last_modified'] = last_modified | ||||
model['created'] = created | ||||
|
r17525 | model['content'] = None | ||
model['format'] = None | ||||
|
r19011 | model['mimetype'] = None | ||
|
r19005 | try: | ||
model['writable'] = os.access(os_path, os.W_OK) | ||||
except OSError: | ||||
self.log.error("Failed to check write permissions on %s", os_path) | ||||
model['writable'] = False | ||||
|
r17525 | return model | ||
|
r18749 | def _dir_model(self, path, content=True): | ||
|
r17525 | """Build a model for a directory | ||
if content is requested, will include a listing of the directory | ||||
""" | ||||
|
r18749 | os_path = self._get_os_path(path) | ||
|
r17525 | |||
|
r19005 | four_o_four = u'directory does not exist: %r' % path | ||
|
r17537 | |||
|
r17525 | if not os.path.isdir(os_path): | ||
|
r17537 | raise web.HTTPError(404, four_o_four) | ||
|
r17525 | elif is_hidden(os_path, self.root_dir): | ||
|
r17537 | self.log.info("Refusing to serve hidden directory %r, via 404 Error", | ||
os_path | ||||
) | ||||
raise web.HTTPError(404, four_o_four) | ||||
|
r17525 | |||
|
r18749 | model = self._base_model(path) | ||
|
r15069 | model['type'] = 'directory' | ||
|
r17525 | if content: | ||
|
r17529 | model['content'] = contents = [] | ||
|
r18758 | os_dir = self._get_os_path(path) | ||
for name in os.listdir(os_dir): | ||||
os_path = os.path.join(os_dir, name) | ||||
|
r17710 | # skip over broken symlinks in listing | ||
if not os.path.exists(os_path): | ||||
self.log.warn("%s doesn't exist", os_path) | ||||
continue | ||||
|
r18758 | elif not os.path.isfile(os_path) and not os.path.isdir(os_path): | ||
self.log.debug("%s not a regular file", os_path) | ||||
continue | ||||
|
r17525 | if self.should_list(name) and not is_hidden(os_path, self.root_dir): | ||
|
r18791 | contents.append(self.get( | ||
|
r18749 | path='%s/%s' % (path, name), | ||
content=False) | ||||
) | ||||
|
r17525 | |||
|
r17527 | model['format'] = 'json' | ||
|
r17525 | |||
|
r15069 | return model | ||
|
r18788 | def _file_model(self, path, content=True, format=None): | ||
|
r17525 | """Build a model for a file | ||
|
r17523 | |||
|
r17525 | if content is requested, include the file contents. | ||
|
r18788 | |||
format: | ||||
If 'text', the contents will be decoded as UTF-8. | ||||
If 'base64', the raw bytes contents will be encoded as base64. | ||||
If not specified, try to decode as UTF-8, and fall back to base64 | ||||
|
r17525 | """ | ||
|
r18749 | model = self._base_model(path) | ||
|
r17525 | model['type'] = 'file' | ||
|
r19011 | |||
os_path = self._get_os_path(path) | ||||
|
r17525 | if content: | ||
|
r19786 | content, format = self._read_file(os_path, format) | ||
default_mime = { | ||||
'text': 'text/plain', | ||||
'base64': 'application/octet-stream' | ||||
}[format] | ||||
model.update( | ||||
content=content, | ||||
format=format, | ||||
mimetype=mimetypes.guess_type(os_path)[0] or default_mime, | ||||
) | ||||
|
r18788 | |||
|
r17525 | return model | ||
|
r17523 | |||
|
r18749 | def _notebook_model(self, path, content=True): | ||
|
r17525 | """Build a notebook model | ||
if content is requested, the notebook content will be populated | ||||
as a JSON structure (not double-serialized) | ||||
|
r13048 | """ | ||
|
r18749 | model = self._base_model(path) | ||
|
r17525 | model['type'] = 'notebook' | ||
if content: | ||||
|
r18749 | os_path = self._get_os_path(path) | ||
|
r19747 | nb = self._read_notebook(os_path, as_version=4) | ||
|
r18749 | self.mark_trusted_cells(nb, path) | ||
|
r17525 | model['content'] = nb | ||
model['format'] = 'json' | ||||
|
r18249 | self.validate_notebook_model(model) | ||
|
r17525 | return model | ||
|
r13046 | |||
|
r19391 | def get(self, path, content=True, type=None, format=None): | ||
|
r18749 | """ Takes a path for an entity and returns its model | ||
|
r17523 | |||
|
r13048 | Parameters | ||
---------- | ||||
path : str | ||||
|
r17535 | the API path that describes the relative path for the target | ||
|
r18781 | content : bool | ||
Whether to include the contents in the reply | ||||
|
r19391 | type : str, optional | ||
|
r18781 | The requested type - 'file', 'notebook', or 'directory'. | ||
|
r18790 | Will raise HTTPError 400 if the content doesn't match. | ||
|
r18788 | format : str, optional | ||
The requested format for file contents. 'text' or 'base64'. | ||||
Ignored if this returns a notebook or directory model. | ||||
|
r17523 | |||
|
r13048 | Returns | ||
------- | ||||
model : dict | ||||
|
r17525 | the contents model. If content=True, returns the contents | ||
of the file or directory as well. | ||||
|
r13048 | """ | ||
|
r13078 | path = path.strip('/') | ||
|
r17525 | |||
|
r18749 | if not self.exists(path): | ||
raise web.HTTPError(404, u'No such file or directory: %s' % path) | ||||
|
r17525 | |||
|
r18749 | os_path = self._get_os_path(path) | ||
|
r17525 | if os.path.isdir(os_path): | ||
|
r19391 | if type not in (None, 'directory'): | ||
|
r18781 | raise web.HTTPError(400, | ||
|
r19391 | u'%s is a directory, not a %s' % (path, type), reason='bad type') | ||
|
r18749 | model = self._dir_model(path, content=content) | ||
|
r19391 | elif type == 'notebook' or (type is None and path.endswith('.ipynb')): | ||
|
r18749 | model = self._notebook_model(path, content=content) | ||
|
r17525 | else: | ||
|
r19391 | if type == 'directory': | ||
|
r18781 | raise web.HTTPError(400, | ||
|
r19337 | u'%s is not a directory', reason='bad type') | ||
|
r18788 | model = self._file_model(path, content=content, format=format) | ||
|
r13046 | return model | ||
|
r12997 | |||
|
r18749 | def _save_directory(self, os_path, model, path=''): | ||
|
r17529 | """create a directory""" | ||
|
r17537 | if is_hidden(os_path, self.root_dir): | ||
raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path) | ||||
|
r17527 | if not os.path.exists(os_path): | ||
|
r19005 | with self.perm_to_403(): | ||
os.mkdir(os_path) | ||||
|
r17527 | elif not os.path.isdir(os_path): | ||
raise web.HTTPError(400, u'Not a directory: %s' % (os_path)) | ||||
|
r17537 | else: | ||
self.log.debug("Directory %r already exists", os_path) | ||||
|
r17527 | |||
|
r18749 | def save(self, model, path=''): | ||
|
r17527 | """Save the file model and return the model with no content.""" | ||
|
r13078 | path = path.strip('/') | ||
|
r13046 | |||
|
r17527 | if 'type' not in model: | ||
raise web.HTTPError(400, u'No file type provided') | ||||
|
r17532 | if 'content' not in model and model['type'] != 'directory': | ||
raise web.HTTPError(400, u'No file content provided') | ||||
|
r15093 | |||
|
r19299 | self.run_pre_save_hook(model=model, path=path) | ||
|
r18749 | os_path = self._get_os_path(path) | ||
|
r17527 | self.log.debug("Saving %s", os_path) | ||
|
r4484 | try: | ||
|
r17527 | if model['type'] == 'notebook': | ||
|
r19786 | nb = nbformat.from_dict(model['content']) | ||
self.check_and_sign(nb, path) | ||||
self._save_notebook(os_path, nb) | ||||
# One checkpoint should always exist for notebooks. | ||||
if not self.checkpoint_manager.list_checkpoints(path): | ||||
self.checkpoint_manager.create_notebook_checkpoint( | ||||
nb, | ||||
path, | ||||
) | ||||
|
r17527 | elif model['type'] == 'file': | ||
|
r19786 | # Missing format will be handled internally by _save_file. | ||
self._save_file(os_path, model['content'], model.get('format')) | ||||
|
r17527 | elif model['type'] == 'directory': | ||
|
r18749 | self._save_directory(os_path, model, path) | ||
|
r17527 | else: | ||
raise web.HTTPError(400, "Unhandled contents type: %s" % model['type']) | ||||
except web.HTTPError: | ||||
raise | ||||
|
r5709 | except Exception as e: | ||
|
r19005 | self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True) | ||
raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e)) | ||||
|
r8180 | |||
|
r18249 | validation_message = None | ||
if model['type'] == 'notebook': | ||||
self.validate_notebook_model(model) | ||||
validation_message = model.get('message', None) | ||||
|
r18791 | model = self.get(path, content=False) | ||
|
r18249 | if validation_message: | ||
model['message'] = validation_message | ||||
|
r19299 | |||
self.run_post_save_hook(model=model, os_path=os_path) | ||||
|
r13046 | return model | ||
|
r19727 | def delete_file(self, path): | ||
|
r18749 | """Delete file at path.""" | ||
|
r13078 | path = path.strip('/') | ||
|
r18749 | os_path = self._get_os_path(path) | ||
|
r17530 | rm = os.unlink | ||
if os.path.isdir(os_path): | ||||
listing = os.listdir(os_path) | ||||
|
r19727 | # Don't delete non-empty directories. | ||
# A directory containing only leftover checkpoints is | ||||
# considered empty. | ||||
cp_dir = getattr(self.checkpoint_manager, 'checkpoint_dir', None) | ||||
for entry in listing: | ||||
if entry != cp_dir: | ||||
raise web.HTTPError(400, u'Directory %s not empty' % os_path) | ||||
|
r17530 | elif not os.path.isfile(os_path): | ||
|
r17524 | raise web.HTTPError(404, u'File does not exist: %s' % os_path) | ||
|
r17523 | |||
|
r17530 | if os.path.isdir(os_path): | ||
self.log.debug("Removing directory %s", os_path) | ||||
|
r19005 | with self.perm_to_403(): | ||
shutil.rmtree(os_path) | ||||
|
r17530 | else: | ||
self.log.debug("Unlinking file %s", os_path) | ||||
|
r19005 | with self.perm_to_403(): | ||
rm(os_path) | ||||
|
r4484 | |||
|
r19727 | def rename_file(self, old_path, new_path): | ||
|
r17524 | """Rename a file.""" | ||
|
r13078 | old_path = old_path.strip('/') | ||
new_path = new_path.strip('/') | ||||
|
r18749 | if new_path == old_path: | ||
|
r13046 | return | ||
|
r17523 | |||
|
r18749 | new_os_path = self._get_os_path(new_path) | ||
old_os_path = self._get_os_path(old_path) | ||||
|
r13046 | |||
# Should we proceed with the move? | ||||
|
r18758 | if os.path.exists(new_os_path): | ||
raise web.HTTPError(409, u'File already exists: %s' % new_path) | ||||
|
r13046 | |||
|
r17524 | # Move the file | ||
|
r13046 | try: | ||
|
r19005 | with self.perm_to_403(): | ||
shutil.move(old_os_path, new_os_path) | ||||
except web.HTTPError: | ||||
raise | ||||
|
r13051 | except Exception as e: | ||
|
r18758 | raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e)) | ||
|
r13046 | |||
|
r10019 | def info_string(self): | ||
|
r17524 | return "Serving notebooks from local directory: %s" % self.root_dir | ||
|
r16052 | |||
|
r18749 | def get_kernel_path(self, path, model=None): | ||
|
r19093 | """Return the initial API path of a kernel associated with a given notebook""" | ||
|
r18749 | if '/' in path: | ||
|
r18758 | parent_dir = path.rsplit('/', 1)[0] | ||
|
r18749 | else: | ||
|
r18758 | parent_dir = '' | ||
|
r19093 | return parent_dir | ||