##// END OF EJS Templates
Merge PR #7616 at f6260bd: notebook running indicator in dashboard...
Merge PR #7616 at f6260bd: notebook running indicator in dashboard Closes gh-7616

File last commit:

r19839:6ab8853b
r20259:a41eba58 merge
Show More
fileio.py
166 lines | 5.3 KiB | text/x-python | PythonLexer
"""
Utilities for file-based Contents/Checkpoints managers.
"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
from contextlib import contextmanager
import errno
import io
import os
import shutil
from tornado.web import HTTPError
from IPython.html.utils import (
to_api_path,
to_os_path,
)
from IPython import nbformat
from IPython.utils.io import atomic_writing
from IPython.utils.py3compat import str_to_unicode
class FileManagerMixin(object):
"""
Mixin for ContentsAPI classes that interact with the filesystem.
Provides facilities for reading, writing, and copying both notebooks and
generic files.
Shared by FileContentsManager and FileCheckpoints.
Note
----
Classes using this mixin must provide the following attributes:
root_dir : unicode
A directory against against which API-style paths are to be resolved.
log : logging.Logger
"""
@contextmanager
def open(self, os_path, *args, **kwargs):
"""wrapper around io.open that turns permission errors into 403"""
with self.perm_to_403(os_path):
with io.open(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def atomic_writing(self, os_path, *args, **kwargs):
"""wrapper around atomic_writing that turns permission errors to 403"""
with self.perm_to_403(os_path):
with atomic_writing(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def perm_to_403(self, os_path=''):
"""context manager for turning permission errors into 403."""
try:
yield
except OSError as e:
if e.errno in {errno.EPERM, errno.EACCES}:
# make 403 error message without root prefix
# this may not work perfectly on unicode paths on Python 2,
# but nobody should be doing that anyway.
if not os_path:
os_path = str_to_unicode(e.filename or 'unknown file')
path = to_api_path(os_path, root=self.root_dir)
raise HTTPError(403, u'Permission denied: %s' % path)
else:
raise
def _copy(self, src, dest):
"""copy src to dest
like shutil.copy2, but log errors in copystat
"""
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
except OSError:
self.log.debug("copystat on %s failed", dest, exc_info=True)
def _get_os_path(self, path):
"""Given an API path, return its file system path.
Parameters
----------
path : string
The relative API path to the named file.
Returns
-------
path : string
Native, absolute OS path to for a file.
"""
return to_os_path(path, self.root_dir)
def _read_notebook(self, os_path, as_version=4):
"""Read a notebook from an os path."""
with self.open(os_path, 'r', encoding='utf-8') as f:
try:
return nbformat.read(f, as_version=as_version)
except Exception as e:
raise HTTPError(
400,
u"Unreadable Notebook: %s %r" % (os_path, e),
)
def _save_notebook(self, os_path, nb):
"""Save a notebook to an os_path."""
with self.atomic_writing(os_path, encoding='utf-8') as f:
nbformat.write(nb, f, version=nbformat.NO_CONVERT)
def _read_file(self, os_path, format):
"""Read a non-notebook file.
os_path: The path to be read.
format:
If 'text', the contents will be decoded as UTF-8.
If 'base64', the raw bytes contents will be encoded as base64.
If not specified, try to decode as UTF-8, and fall back to base64
"""
if not os.path.isfile(os_path):
raise HTTPError(400, "Cannot read non-file %s" % os_path)
with self.open(os_path, 'rb') as f:
bcontent = f.read()
if format is None or format == 'text':
# Try to interpret as unicode if format is unknown or if unicode
# was explicitly requested.
try:
return bcontent.decode('utf8'), 'text'
except UnicodeError:
if format == 'text':
raise HTTPError(
400,
"%s is not UTF-8 encoded" % os_path,
reason='bad format',
)
return base64.encodestring(bcontent).decode('ascii'), 'base64'
def _save_file(self, os_path, content, format):
"""Save content of a generic file."""
if format not in {'text', 'base64'}:
raise HTTPError(
400,
"Must specify format of file contents as 'text' or 'base64'",
)
try:
if format == 'text':
bcontent = content.encode('utf8')
else:
b64_bytes = content.encode('ascii')
bcontent = base64.decodestring(b64_bytes)
except Exception as e:
raise HTTPError(
400, u'Encoding error saving %s: %s' % (os_path, e)
)
with self.atomic_writing(os_path, text=False) as f:
f.write(bcontent)