##// END OF EJS Templates
use function, and tests
use function, and tests

File last commit:

r20658:2e67c042
r21103:b9e59565
Show More
filemanager.py
473 lines | 16.1 KiB | text/x-python | PythonLexer
"""A contents manager that uses the local file system for storage."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import io
import os
import shutil
import mimetypes
from tornado import web
from .filecheckpoints import FileCheckpoints
from .fileio import FileManagerMixin
from .manager import ContentsManager
from IPython import nbformat
from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
from IPython.utils.py3compat import getcwd, string_types
from IPython.utils import tz
from IPython.html.utils import (
is_hidden,
to_api_path,
)
_script_exporter = None
def _post_save_script(model, os_path, contents_manager, **kwargs):
"""convert notebooks to Python script after save with nbconvert
replaces `ipython notebook --script`
"""
from IPython.nbconvert.exporters.script import ScriptExporter
if model['type'] != 'notebook':
return
global _script_exporter
if _script_exporter is None:
_script_exporter = ScriptExporter(parent=contents_manager)
log = contents_manager.log
base, ext = os.path.splitext(os_path)
py_fname = base + '.py'
script, resources = _script_exporter.from_filename(os_path)
script_fname = base + resources.get('output_extension', '.txt')
log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir))
with io.open(script_fname, 'w', encoding='utf-8') as f:
f.write(script)
class FileContentsManager(FileManagerMixin, ContentsManager):
root_dir = Unicode(config=True)
def _root_dir_default(self):
try:
return self.parent.notebook_dir
except AttributeError:
return getcwd()
save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook')
def _save_script_changed(self):
self.log.warn("""
`--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks:
ContentsManager.pre_save_hook
FileContentsManager.post_save_hook
A post-save hook has been registered that calls:
ipython nbconvert --to script [notebook]
which behaves similarly to `--script`.
""")
self.post_save_hook = _post_save_script
post_save_hook = Any(None, config=True,
help="""Python callable or importstring thereof
to be called on the path of a file just saved.
This can be used to process the file on disk,
such as converting the notebook to a script or HTML via nbconvert.
It will be called as (all arguments passed by keyword)::
hook(os_path=os_path, model=model, contents_manager=instance)
- path: the filesystem path to the file just written
- model: the model representing the file
- contents_manager: this ContentsManager instance
"""
)
def _post_save_hook_changed(self, name, old, new):
if new and isinstance(new, string_types):
self.post_save_hook = import_item(self.post_save_hook)
elif new:
if not callable(new):
raise TraitError("post_save_hook must be callable")
def run_post_save_hook(self, model, os_path):
"""Run the post-save hook if defined, and log errors"""
if self.post_save_hook:
try:
self.log.debug("Running post-save hook on %s", os_path)
self.post_save_hook(os_path=os_path, model=model, contents_manager=self)
except Exception:
self.log.error("Post-save hook failed on %s", os_path, exc_info=True)
def _root_dir_changed(self, name, old, new):
"""Do a bit of validation of the root_dir."""
if not os.path.isabs(new):
# If we receive a non-absolute path, make it absolute.
self.root_dir = os.path.abspath(new)
return
if not os.path.isdir(new):
raise TraitError("%r is not a directory" % new)
def _checkpoints_class_default(self):
return FileCheckpoints
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to root_dir).
Returns
-------
hidden : bool
Whether the path exists and is hidden.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return is_hidden(os_path, self.root_dir)
def file_exists(self, path):
"""Returns True if the file exists, else returns False.
API-style wrapper for os.path.isfile
Parameters
----------
path : string
The relative path to the file (with '/' as separator)
Returns
-------
exists : bool
Whether the file exists.
"""
path = path.strip('/')
os_path = self._get_os_path(path)
return os.path.isfile(os_path)
def dir_exists(self, path):
"""Does the API-style path refer to an extant directory?
API-style wrapper for os.path.isdir
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to root_dir).
Returns
-------
exists : bool
Whether the path is indeed a directory.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return os.path.isdir(os_path)
def exists(self, path):
"""Returns True if the path exists, else returns False.
API-style wrapper for os.path.exists
Parameters
----------
path : string
The API path to the file (with '/' as separator)
Returns
-------
exists : bool
Whether the target exists.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return os.path.exists(os_path)
def _base_model(self, path):
"""Build the common base of a contents model"""
os_path = self._get_os_path(path)
info = os.stat(os_path)
last_modified = tz.utcfromtimestamp(info.st_mtime)
created = tz.utcfromtimestamp(info.st_ctime)
# Create the base model.
model = {}
model['name'] = path.rsplit('/', 1)[-1]
model['path'] = path
model['last_modified'] = last_modified
model['created'] = created
model['content'] = None
model['format'] = None
model['mimetype'] = None
try:
model['writable'] = os.access(os_path, os.W_OK)
except OSError:
self.log.error("Failed to check write permissions on %s", os_path)
model['writable'] = False
return model
def _dir_model(self, path, content=True):
"""Build a model for a directory
if content is requested, will include a listing of the directory
"""
os_path = self._get_os_path(path)
four_o_four = u'directory does not exist: %r' % path
if not os.path.isdir(os_path):
raise web.HTTPError(404, four_o_four)
elif is_hidden(os_path, self.root_dir):
self.log.info("Refusing to serve hidden directory %r, via 404 Error",
os_path
)
raise web.HTTPError(404, four_o_four)
model = self._base_model(path)
model['type'] = 'directory'
if content:
model['content'] = contents = []
os_dir = self._get_os_path(path)
for name in os.listdir(os_dir):
os_path = os.path.join(os_dir, name)
# skip over broken symlinks in listing
if not os.path.exists(os_path):
self.log.warn("%s doesn't exist", os_path)
continue
elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
self.log.debug("%s not a regular file", os_path)
continue
if self.should_list(name) and not is_hidden(os_path, self.root_dir):
contents.append(self.get(
path='%s/%s' % (path, name),
content=False)
)
model['format'] = 'json'
return model
def _file_model(self, path, content=True, format=None):
"""Build a model for a file
if content is requested, include the file contents.
format:
If 'text', the contents will be decoded as UTF-8.
If 'base64', the raw bytes contents will be encoded as base64.
If not specified, try to decode as UTF-8, and fall back to base64
"""
model = self._base_model(path)
model['type'] = 'file'
os_path = self._get_os_path(path)
if content:
content, format = self._read_file(os_path, format)
default_mime = {
'text': 'text/plain',
'base64': 'application/octet-stream'
}[format]
model.update(
content=content,
format=format,
mimetype=mimetypes.guess_type(os_path)[0] or default_mime,
)
return model
def _notebook_model(self, path, content=True):
"""Build a notebook model
if content is requested, the notebook content will be populated
as a JSON structure (not double-serialized)
"""
model = self._base_model(path)
model['type'] = 'notebook'
if content:
os_path = self._get_os_path(path)
nb = self._read_notebook(os_path, as_version=4)
self.mark_trusted_cells(nb, path)
model['content'] = nb
model['format'] = 'json'
self.validate_notebook_model(model)
return model
def get(self, path, content=True, type=None, format=None):
""" Takes a path for an entity and returns its model
Parameters
----------
path : str
the API path that describes the relative path for the target
content : bool
Whether to include the contents in the reply
type : str, optional
The requested type - 'file', 'notebook', or 'directory'.
Will raise HTTPError 400 if the content doesn't match.
format : str, optional
The requested format for file contents. 'text' or 'base64'.
Ignored if this returns a notebook or directory model.
Returns
-------
model : dict
the contents model. If content=True, returns the contents
of the file or directory as well.
"""
path = path.strip('/')
if not self.exists(path):
raise web.HTTPError(404, u'No such file or directory: %s' % path)
os_path = self._get_os_path(path)
if os.path.isdir(os_path):
if type not in (None, 'directory'):
raise web.HTTPError(400,
u'%s is a directory, not a %s' % (path, type), reason='bad type')
model = self._dir_model(path, content=content)
elif type == 'notebook' or (type is None and path.endswith('.ipynb')):
model = self._notebook_model(path, content=content)
else:
if type == 'directory':
raise web.HTTPError(400,
u'%s is not a directory' % path, reason='bad type')
model = self._file_model(path, content=content, format=format)
return model
def _save_directory(self, os_path, model, path=''):
"""create a directory"""
if is_hidden(os_path, self.root_dir):
raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
if not os.path.exists(os_path):
with self.perm_to_403():
os.mkdir(os_path)
elif not os.path.isdir(os_path):
raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
else:
self.log.debug("Directory %r already exists", os_path)
def save(self, model, path=''):
"""Save the file model and return the model with no content."""
path = path.strip('/')
if 'type' not in model:
raise web.HTTPError(400, u'No file type provided')
if 'content' not in model and model['type'] != 'directory':
raise web.HTTPError(400, u'No file content provided')
os_path = self._get_os_path(path)
self.log.debug("Saving %s", os_path)
self.run_pre_save_hook(model=model, path=path)
try:
if model['type'] == 'notebook':
nb = nbformat.from_dict(model['content'])
self.check_and_sign(nb, path)
self._save_notebook(os_path, nb)
# One checkpoint should always exist for notebooks.
if not self.checkpoints.list_checkpoints(path):
self.create_checkpoint(path)
elif model['type'] == 'file':
# Missing format will be handled internally by _save_file.
self._save_file(os_path, model['content'], model.get('format'))
elif model['type'] == 'directory':
self._save_directory(os_path, model, path)
else:
raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
except web.HTTPError:
raise
except Exception as e:
self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
validation_message = None
if model['type'] == 'notebook':
self.validate_notebook_model(model)
validation_message = model.get('message', None)
model = self.get(path, content=False)
if validation_message:
model['message'] = validation_message
self.run_post_save_hook(model=model, os_path=os_path)
return model
def delete_file(self, path):
"""Delete file at path."""
path = path.strip('/')
os_path = self._get_os_path(path)
rm = os.unlink
if os.path.isdir(os_path):
listing = os.listdir(os_path)
# Don't delete non-empty directories.
# A directory containing only leftover checkpoints is
# considered empty.
cp_dir = getattr(self.checkpoints, 'checkpoint_dir', None)
for entry in listing:
if entry != cp_dir:
raise web.HTTPError(400, u'Directory %s not empty' % os_path)
elif not os.path.isfile(os_path):
raise web.HTTPError(404, u'File does not exist: %s' % os_path)
if os.path.isdir(os_path):
self.log.debug("Removing directory %s", os_path)
with self.perm_to_403():
shutil.rmtree(os_path)
else:
self.log.debug("Unlinking file %s", os_path)
with self.perm_to_403():
rm(os_path)
def rename_file(self, old_path, new_path):
"""Rename a file."""
old_path = old_path.strip('/')
new_path = new_path.strip('/')
if new_path == old_path:
return
new_os_path = self._get_os_path(new_path)
old_os_path = self._get_os_path(old_path)
# Should we proceed with the move?
if os.path.exists(new_os_path):
raise web.HTTPError(409, u'File already exists: %s' % new_path)
# Move the file
try:
with self.perm_to_403():
shutil.move(old_os_path, new_os_path)
except web.HTTPError:
raise
except Exception as e:
raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
def info_string(self):
return "Serving notebooks from local directory: %s" % self.root_dir
def get_kernel_path(self, path, model=None):
"""Return the initial API path of a kernel associated with a given notebook"""
if '/' in path:
parent_dir = path.rsplit('/', 1)[0]
else:
parent_dir = ''
return parent_dir