diff --git a/IPython/html/services/contents/checkpoints.py b/IPython/html/services/contents/checkpoints.py
new file mode 100644
index 0000000..0ba546f
--- /dev/null
+++ b/IPython/html/services/contents/checkpoints.py
@@ -0,0 +1,112 @@
+"""
+Classes for managing Checkpoints.
+"""
+
+# Copyright (c) IPython Development Team.
+# Distributed under the terms of the Modified BSD License.
+
+from IPython.config.configurable import LoggingConfigurable
+
+
+class CheckpointManager(LoggingConfigurable):
+ """
+ Base class for managing checkpoints for a ContentsManager.
+
+ Subclasses are required to implement:
+
+ create_checkpoint(self, contents_mgr, path)
+ restore_checkpoint(self, contents_mgr, checkpoint_id, path)
+ rename_checkpoint(self, checkpoint_id, old_path, new_path)
+ delete_checkpoint(self, checkpoint_id, path)
+ list_checkpoints(self, path)
+ """
+ def create_checkpoint(self, contents_mgr, path):
+ """Create a checkpoint."""
+ raise NotImplementedError("must be implemented in a subclass")
+
+ def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
+ """Restore a checkpoint"""
+ raise NotImplementedError("must be implemented in a subclass")
+
+ def rename_checkpoint(self, checkpoint_id, old_path, new_path):
+ """Rename a single checkpoint from old_path to new_path."""
+ raise NotImplementedError("must be implemented in a subclass")
+
+ def delete_checkpoint(self, checkpoint_id, path):
+ """delete a checkpoint for a file"""
+ raise NotImplementedError("must be implemented in a subclass")
+
+ def list_checkpoints(self, path):
+ """Return a list of checkpoints for a given file"""
+ raise NotImplementedError("must be implemented in a subclass")
+
+ def rename_all_checkpoints(self, old_path, new_path):
+ """Rename all checkpoints for old_path to new_path."""
+ for cp in self.list_checkpoints(old_path):
+ self.rename_checkpoint(cp['id'], old_path, new_path)
+
+ def delete_all_checkpoints(self, path):
+ """Delete all checkpoints for the given path."""
+ for checkpoint in self.list_checkpoints(path):
+ self.delete_checkpoint(checkpoint['id'], path)
+
+
+class GenericCheckpointMixin(object):
+ """
+ Helper for creating CheckpointManagers that can be used with any
+ ContentsManager.
+
+ Provides an implementation of `create_checkpoint` and `restore_checkpoint`
+ in terms of the following operations:
+
+ create_file_checkpoint(self, content, format, path)
+ create_notebook_checkpoint(self, nb, path)
+ get_checkpoint(self, checkpoint_id, path, type)
+
+ **Any** valid CheckpointManager implementation should also be valid when
+ this mixin is applied.
+ """
+
+ def create_checkpoint(self, contents_mgr, path):
+ model = contents_mgr.get(path, content=True)
+ type = model['type']
+ if type == 'notebook':
+ return self.create_notebook_checkpoint(
+ model['content'],
+ path,
+ )
+ elif type == 'file':
+ return self.create_file_checkpoint(
+ model['content'],
+ model['format'],
+ path,
+ )
+
+ def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
+ """Restore a checkpoint."""
+ type = contents_mgr.get(path, content=False)['type']
+ model = self.get_checkpoint(checkpoint_id, path, type)
+ contents_mgr.save(model, path)
+
+ # Required Methods
+ def create_file_checkpoint(self, content, format, path):
+ """Create a checkpoint of the current state of a file
+
+ Returns a checkpoint model for the new checkpoint.
+ """
+ raise NotImplementedError("must be implemented in a subclass")
+
+ def create_notebook_checkpoint(self, nb, path):
+ """Create a checkpoint of the current state of a file
+
+ Returns a checkpoint model for the new checkpoint.
+ """
+ raise NotImplementedError("must be implemented in a subclass")
+
+ def get_checkpoint(self, checkpoint_id, path, type):
+ """Get the content of a checkpoint.
+
+ Returns an unvalidated model with the same structure as
+ the return value of ContentsManager.get
+ """
+ raise NotImplementedError("must be implemented in a subclass")
diff --git a/IPython/html/services/contents/filecheckpoints.py b/IPython/html/services/contents/filecheckpoints.py
new file mode 100644
index 0000000..e1e4f8c
--- /dev/null
+++ b/IPython/html/services/contents/filecheckpoints.py
@@ -0,0 +1,198 @@
+"""
+File-based CheckpointManagers.
+"""
+import os
+import shutil
+
+from tornado.web import HTTPError
+
+from .checkpoints import (
+ CheckpointManager,
+ GenericCheckpointMixin,
+)
+from .fileio import FileManagerMixin
+
+from IPython.utils import tz
+from IPython.utils.path import ensure_dir_exists
+from IPython.utils.py3compat import getcwd
+from IPython.utils.traitlets import Unicode
+
+
+class FileCheckpointManager(FileManagerMixin, CheckpointManager):
+ """
+ A CheckpointManager that caches checkpoints for files in adjacent
+ directories.
+
+ Only works with FileContentsManager. Use GenericFileCheckpointManager if
+ you want file-based checkpoints with another ContentsManager.
+ """
+
+ checkpoint_dir = Unicode(
+ '.ipynb_checkpoints',
+ config=True,
+ help="""The directory name in which to keep file checkpoints
+
+ This is a path relative to the file's own directory.
+
+ By default, it is .ipynb_checkpoints
+ """,
+ )
+
+ root_dir = Unicode(config=True)
+
+ def _root_dir_default(self):
+ try:
+ return self.parent.root_dir
+ except AttributeError:
+ return getcwd()
+
+ # ContentsManager-dependent checkpoint API
+ def create_checkpoint(self, contents_mgr, path):
+ """Create a checkpoint."""
+ checkpoint_id = u'checkpoint'
+ src_path = contents_mgr._get_os_path(path)
+ dest_path = self.checkpoint_path(checkpoint_id, path)
+ self._copy(src_path, dest_path)
+ return self.checkpoint_model(checkpoint_id, dest_path)
+
+ def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
+ """Restore a checkpoint."""
+ src_path = self.checkpoint_path(checkpoint_id, path)
+ dest_path = contents_mgr._get_os_path(path)
+ self._copy(src_path, dest_path)
+
+ # ContentsManager-independent checkpoint API
+ def rename_checkpoint(self, checkpoint_id, old_path, new_path):
+ """Rename a checkpoint from old_path to new_path."""
+ old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
+ new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
+ if os.path.isfile(old_cp_path):
+ self.log.debug(
+ "Renaming checkpoint %s -> %s",
+ old_cp_path,
+ new_cp_path,
+ )
+ with self.perm_to_403():
+ shutil.move(old_cp_path, new_cp_path)
+
+ def delete_checkpoint(self, checkpoint_id, path):
+ """delete a file's checkpoint"""
+ path = path.strip('/')
+ cp_path = self.checkpoint_path(checkpoint_id, path)
+ if not os.path.isfile(cp_path):
+ self.no_such_checkpoint(path, checkpoint_id)
+
+ self.log.debug("unlinking %s", cp_path)
+ with self.perm_to_403():
+ os.unlink(cp_path)
+
+ def list_checkpoints(self, path):
+ """list the checkpoints for a given file
+
+ This contents manager currently only supports one checkpoint per file.
+ """
+ path = path.strip('/')
+ checkpoint_id = "checkpoint"
+ os_path = self.checkpoint_path(checkpoint_id, path)
+ if not os.path.isfile(os_path):
+ return []
+ else:
+ return [self.checkpoint_model(checkpoint_id, os_path)]
+
+ # Checkpoint-related utilities
+ def checkpoint_path(self, checkpoint_id, path):
+ """find the path to a checkpoint"""
+ path = path.strip('/')
+ parent, name = ('/' + path).rsplit('/', 1)
+ parent = parent.strip('/')
+ basename, ext = os.path.splitext(name)
+ filename = u"{name}-{checkpoint_id}{ext}".format(
+ name=basename,
+ checkpoint_id=checkpoint_id,
+ ext=ext,
+ )
+ os_path = self._get_os_path(path=parent)
+ cp_dir = os.path.join(os_path, self.checkpoint_dir)
+ with self.perm_to_403():
+ ensure_dir_exists(cp_dir)
+ cp_path = os.path.join(cp_dir, filename)
+ return cp_path
+
+ def checkpoint_model(self, checkpoint_id, os_path):
+ """construct the info dict for a given checkpoint"""
+ stats = os.stat(os_path)
+ last_modified = tz.utcfromtimestamp(stats.st_mtime)
+ info = dict(
+ id=checkpoint_id,
+ last_modified=last_modified,
+ )
+ return info
+
+ # Error Handling
+ def no_such_checkpoint(self, path, checkpoint_id):
+ raise HTTPError(
+ 404,
+ u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
+ )
+
+
+class GenericFileCheckpointManager(GenericCheckpointMixin,
+ FileCheckpointManager):
+ """
+ Local filesystem CheckpointManager that works with any conforming
+ ContentsManager.
+ """
+ def create_file_checkpoint(self, content, format, path):
+ """Create a checkpoint from the current content of a notebook."""
+ path = path.strip('/')
+ # only the one checkpoint ID:
+ checkpoint_id = u"checkpoint"
+ os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
+ self.log.debug("creating checkpoint for %s", path)
+ with self.perm_to_403():
+ self._save_file(os_checkpoint_path, content, format=format)
+
+ # return the checkpoint info
+ return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
+
+ def create_notebook_checkpoint(self, nb, path):
+ """Create a checkpoint from the current content of a notebook."""
+ path = path.strip('/')
+ # only the one checkpoint ID:
+ checkpoint_id = u"checkpoint"
+ os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
+ self.log.debug("creating checkpoint for %s", path)
+ with self.perm_to_403():
+ self._save_notebook(os_checkpoint_path, nb)
+
+ # return the checkpoint info
+ return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
+
+ def get_checkpoint(self, checkpoint_id, path, type):
+ """Get the content of a checkpoint.
+
+ Returns a model suitable for passing to ContentsManager.save.
+ """
+ path = path.strip('/')
+ self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
+ os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
+ if not os.path.isfile(os_checkpoint_path):
+ self.no_such_checkpoint(path, checkpoint_id)
+
+ if type == 'notebook':
+ return {
+ 'type': type,
+ 'content': self._read_notebook(
+ os_checkpoint_path,
+ as_version=4,
+ ),
+ }
+ elif type == 'file':
+ content, format = self._read_file(os_checkpoint_path, format=None)
+ return {
+ 'type': type,
+ 'content': content,
+ 'format': format,
+ }
+ else:
+ raise HTTPError(500, u'Unexpected type %s' % type)
diff --git a/IPython/html/services/contents/fileio.py b/IPython/html/services/contents/fileio.py
new file mode 100644
index 0000000..3b646aa
--- /dev/null
+++ b/IPython/html/services/contents/fileio.py
@@ -0,0 +1,166 @@
+"""
+Utilities for file-based Contents/Checkpoints managers.
+"""
+
+# Copyright (c) IPython Development Team.
+# Distributed under the terms of the Modified BSD License.
+
+import base64
+from contextlib import contextmanager
+import errno
+import io
+import os
+import shutil
+
+from tornado.web import HTTPError
+
+from IPython.html.utils import (
+ to_api_path,
+ to_os_path,
+)
+from IPython import nbformat
+from IPython.utils.io import atomic_writing
+from IPython.utils.py3compat import str_to_unicode
+
+
+class FileManagerMixin(object):
+ """
+ Mixin for ContentsAPI classes that interact with the filesystem.
+
+ Provides facilities for reading, writing, and copying both notebooks and
+ generic files.
+
+ Shared by FileContentsManager and FileCheckpointManager.
+
+ Note
+ ----
+ Classes using this mixin must provide the following attributes:
+
+ root_dir : unicode
+ A directory against against which API-style paths are to be resolved.
+
+ log : logging.Logger
+ """
+
+ @contextmanager
+ def open(self, os_path, *args, **kwargs):
+ """wrapper around io.open that turns permission errors into 403"""
+ with self.perm_to_403(os_path):
+ with io.open(os_path, *args, **kwargs) as f:
+ yield f
+
+ @contextmanager
+ def atomic_writing(self, os_path, *args, **kwargs):
+ """wrapper around atomic_writing that turns permission errors to 403"""
+ with self.perm_to_403(os_path):
+ with atomic_writing(os_path, *args, **kwargs) as f:
+ yield f
+
+ @contextmanager
+ def perm_to_403(self, os_path=''):
+ """context manager for turning permission errors into 403."""
+ try:
+ yield
+ except OSError as e:
+ if e.errno in {errno.EPERM, errno.EACCES}:
+ # make 403 error message without root prefix
+ # this may not work perfectly on unicode paths on Python 2,
+ # but nobody should be doing that anyway.
+ if not os_path:
+ os_path = str_to_unicode(e.filename or 'unknown file')
+ path = to_api_path(os_path, root=self.root_dir)
+ raise HTTPError(403, u'Permission denied: %s' % path)
+ else:
+ raise
+
+ def _copy(self, src, dest):
+ """copy src to dest
+
+ like shutil.copy2, but log errors in copystat
+ """
+ shutil.copyfile(src, dest)
+ try:
+ shutil.copystat(src, dest)
+ except OSError:
+ self.log.debug("copystat on %s failed", dest, exc_info=True)
+
+ def _get_os_path(self, path):
+ """Given an API path, return its file system path.
+
+ Parameters
+ ----------
+ path : string
+ The relative API path to the named file.
+
+ Returns
+ -------
+ path : string
+ Native, absolute OS path to for a file.
+ """
+ return to_os_path(path, self.root_dir)
+
+ def _read_notebook(self, os_path, as_version=4):
+ """Read a notebook from an os path."""
+ with self.open(os_path, 'r', encoding='utf-8') as f:
+ try:
+ return nbformat.read(f, as_version=as_version)
+ except Exception as e:
+ raise HTTPError(
+ 400,
+ u"Unreadable Notebook: %s %r" % (os_path, e),
+ )
+
+ def _save_notebook(self, os_path, nb):
+ """Save a notebook to an os_path."""
+ with self.atomic_writing(os_path, encoding='utf-8') as f:
+ nbformat.write(nb, f, version=nbformat.NO_CONVERT)
+
+ def _read_file(self, os_path, format):
+ """Read a non-notebook file.
+
+ os_path: The path to be read.
+ format:
+ If 'text', the contents will be decoded as UTF-8.
+ If 'base64', the raw bytes contents will be encoded as base64.
+ If not specified, try to decode as UTF-8, and fall back to base64
+ """
+ if not os.path.isfile(os_path):
+ raise HTTPError(400, "Cannot read non-file %s" % os_path)
+
+ with self.open(os_path, 'rb') as f:
+ bcontent = f.read()
+
+ if format is None or format == 'text':
+ # Try to interpret as unicode if format is unknown or if unicode
+ # was explicitly requested.
+ try:
+ return bcontent.decode('utf8'), 'text'
+ except UnicodeError:
+ if format == 'text':
+ raise HTTPError(
+ 400,
+ "%s is not UTF-8 encoded" % os_path,
+ reason='bad format',
+ )
+ return base64.encodestring(bcontent).decode('ascii'), 'base64'
+
+ def _save_file(self, os_path, content, format):
+ """Save content of a generic file."""
+ if format not in {'text', 'base64'}:
+ raise HTTPError(
+ 400,
+ "Must specify format of file contents as 'text' or 'base64'",
+ )
+ try:
+ if format == 'text':
+ bcontent = content.encode('utf8')
+ else:
+ b64_bytes = content.encode('ascii')
+ bcontent = base64.decodestring(b64_bytes)
+ except Exception as e:
+ raise HTTPError(
+ 400, u'Encoding error saving %s: %s' % (os_path, e)
+ )
+
+ with self.atomic_writing(os_path, text=False) as f:
+ f.write(bcontent)
diff --git a/IPython/html/services/contents/filemanager.py b/IPython/html/services/contents/filemanager.py
index 81b64b2..561c4a7 100644
--- a/IPython/html/services/contents/filemanager.py
+++ b/IPython/html/services/contents/filemanager.py
@@ -3,9 +3,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
-import base64
-from contextlib import contextmanager
-import errno
+
import io
import os
import shutil
@@ -13,25 +11,23 @@ import mimetypes
from tornado import web
-from .manager import (
- CheckpointManager,
- ContentsManager,
-)
+from .filecheckpoints import FileCheckpointManager
+from .fileio import FileManagerMixin
+from .manager import ContentsManager
+
from IPython import nbformat
-from IPython.utils.io import atomic_writing
from IPython.utils.importstring import import_item
-from IPython.utils.path import ensure_dir_exists
from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
-from IPython.utils.py3compat import getcwd, string_types, str_to_unicode
+from IPython.utils.py3compat import getcwd, string_types
from IPython.utils import tz
from IPython.html.utils import (
is_hidden,
to_api_path,
- to_os_path,
)
_script_exporter = None
+
def _post_save_script(model, os_path, contents_manager, **kwargs):
"""convert notebooks to Python script after save with nbconvert
@@ -56,346 +52,6 @@ def _post_save_script(model, os_path, contents_manager, **kwargs):
f.write(script)
-class FileManagerMixin(object):
- """
- Mixin for ContentsAPI classes that interact with the filesystem.
-
- Provides facilities for reading, writing, and copying both notebooks and
- generic files.
-
- Shared by FileContentsManager and FileCheckpointManager.
-
- Note
- ----
- Classes using this mixin must provide the following attributes:
-
- root_dir : unicode
- A directory against against which API-style paths are to be resolved.
-
- log : logging.Logger
- """
-
- @contextmanager
- def open(self, os_path, *args, **kwargs):
- """wrapper around io.open that turns permission errors into 403"""
- with self.perm_to_403(os_path):
- with io.open(os_path, *args, **kwargs) as f:
- yield f
-
- @contextmanager
- def atomic_writing(self, os_path, *args, **kwargs):
- """wrapper around atomic_writing that turns permission errors into 403"""
- with self.perm_to_403(os_path):
- with atomic_writing(os_path, *args, **kwargs) as f:
- yield f
-
- @contextmanager
- def perm_to_403(self, os_path=''):
- """context manager for turning permission errors into 403."""
- try:
- yield
- except OSError as e:
- if e.errno in {errno.EPERM, errno.EACCES}:
- # make 403 error message without root prefix
- # this may not work perfectly on unicode paths on Python 2,
- # but nobody should be doing that anyway.
- if not os_path:
- os_path = str_to_unicode(e.filename or 'unknown file')
- path = to_api_path(os_path, root=self.root_dir)
- raise web.HTTPError(403, u'Permission denied: %s' % path)
- else:
- raise
-
- def _copy(self, src, dest):
- """copy src to dest
-
- like shutil.copy2, but log errors in copystat
- """
- shutil.copyfile(src, dest)
- try:
- shutil.copystat(src, dest)
- except OSError:
- self.log.debug("copystat on %s failed", dest, exc_info=True)
-
- def _get_os_path(self, path):
- """Given an API path, return its file system path.
-
- Parameters
- ----------
- path : string
- The relative API path to the named file.
-
- Returns
- -------
- path : string
- Native, absolute OS path to for a file.
- """
- return to_os_path(path, self.root_dir)
-
- def _read_notebook(self, os_path, as_version=4):
- """Read a notebook from an os path."""
- with self.open(os_path, 'r', encoding='utf-8') as f:
- try:
- return nbformat.read(f, as_version=as_version)
- except Exception as e:
- raise web.HTTPError(
- 400,
- u"Unreadable Notebook: %s %r" % (os_path, e),
- )
-
- def _save_notebook(self, os_path, nb):
- """Save a notebook to an os_path."""
- with self.atomic_writing(os_path, encoding='utf-8') as f:
- nbformat.write(nb, f, version=nbformat.NO_CONVERT)
-
- def _read_file(self, os_path, format):
- """Read a non-notebook file.
-
- os_path: The path to be read.
- format:
- If 'text', the contents will be decoded as UTF-8.
- If 'base64', the raw bytes contents will be encoded as base64.
- If not specified, try to decode as UTF-8, and fall back to base64
- """
- if not os.path.isfile(os_path):
- raise web.HTTPError(400, "Cannot read non-file %s" % os_path)
-
- with self.open(os_path, 'rb') as f:
- bcontent = f.read()
-
- if format is None or format == 'text':
- # Try to interpret as unicode if format is unknown or if unicode
- # was explicitly requested.
- try:
- return bcontent.decode('utf8'), 'text'
- except UnicodeError as e:
- if format == 'text':
- raise web.HTTPError(
- 400,
- "%s is not UTF-8 encoded" % os_path,
- reason='bad format',
- )
- return base64.encodestring(bcontent).decode('ascii'), 'base64'
-
- def _save_file(self, os_path, content, format):
- """Save content of a generic file."""
- if format not in {'text', 'base64'}:
- raise web.HTTPError(
- 400,
- "Must specify format of file contents as 'text' or 'base64'",
- )
- try:
- if format == 'text':
- bcontent = content.encode('utf8')
- else:
- b64_bytes = content.encode('ascii')
- bcontent = base64.decodestring(b64_bytes)
- except Exception as e:
- raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
-
- with self.atomic_writing(os_path, text=False) as f:
- f.write(bcontent)
-
-
-class FileCheckpointManager(FileManagerMixin, CheckpointManager):
- """
- A CheckpointManager that caches checkpoints for files in adjacent
- directories.
- """
-
- checkpoint_dir = Unicode(
- '.ipynb_checkpoints',
- config=True,
- help="""The directory name in which to keep file checkpoints
-
- This is a path relative to the file's own directory.
-
- By default, it is .ipynb_checkpoints
- """,
- )
-
- root_dir = Unicode(config=True)
-
- def _root_dir_default(self):
- try:
- return self.parent.root_dir
- except AttributeError:
- return getcwd()
-
- # ContentsManager-dependent checkpoint API
- def create_checkpoint(self, contents_mgr, path):
- """
- Create a checkpoint.
-
- If contents_mgr is backed by the local filesystem, just copy the
- appropriate file to the checkpoint directory. Otherwise, ask the
- ContentsManager for a model and write it ourselves.
- """
- if contents_mgr.backend == 'local_file':
- # We know that the file is in the local filesystem, so just copy
- # from the base location to our location.
- checkpoint_id = u'checkpoint'
- src_path = contents_mgr._get_os_path(path)
- dest_path = self.checkpoint_path(checkpoint_id, path)
- self._copy(src_path, dest_path)
- return self.checkpoint_model(checkpoint_id, dest_path)
- else:
- return super(FileCheckpointManager, self).create_checkpoint(
- contents_mgr, path,
- )
-
- def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
- """
- Restore a checkpoint.
-
- If contents_mgr is backed by the local filesystem, just copy the
- appropriate file from the checkpoint directory. Otherwise, load the
- model and pass it to ContentsManager.save.
- """
- if contents_mgr.backend == 'local_file':
- # We know that the file is in the local filesystem, so just copy
- # from our base location to the location expected by content
- src_path = self.checkpoint_path(checkpoint_id, path)
- dest_path = contents_mgr._get_os_path(path)
- self._copy(src_path, dest_path)
- else:
- super(FileCheckpointManager, self).restore_checkpoint(
- contents_mgr, checkpoint_id, path
- )
-
- # ContentsManager-independent checkpoint API
- def rename_checkpoint(self, checkpoint_id, old_path, new_path):
- """Rename a checkpoint from old_path to new_path."""
- old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
- new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
- if os.path.isfile(old_cp_path):
- self.log.debug(
- "Renaming checkpoint %s -> %s",
- old_cp_path,
- new_cp_path,
- )
- with self.perm_to_403():
- shutil.move(old_cp_path, new_cp_path)
-
- def delete_checkpoint(self, checkpoint_id, path):
- """delete a file's checkpoint"""
- path = path.strip('/')
- cp_path = self.checkpoint_path(checkpoint_id, path)
- if not os.path.isfile(cp_path):
- self.no_such_checkpoint(path, checkpoint_id)
-
- self.log.debug("unlinking %s", cp_path)
- with self.perm_to_403():
- os.unlink(cp_path)
-
- def list_checkpoints(self, path):
- """list the checkpoints for a given file
-
- This contents manager currently only supports one checkpoint per file.
- """
- path = path.strip('/')
- checkpoint_id = "checkpoint"
- os_path = self.checkpoint_path(checkpoint_id, path)
- if not os.path.isfile(os_path):
- return []
- else:
- return [self.checkpoint_model(checkpoint_id, os_path)]
-
- # Checkpoint-related utilities
- def checkpoint_path(self, checkpoint_id, path):
- """find the path to a checkpoint"""
- path = path.strip('/')
- parent, name = ('/' + path).rsplit('/', 1)
- parent = parent.strip('/')
- basename, ext = os.path.splitext(name)
- filename = u"{name}-{checkpoint_id}{ext}".format(
- name=basename,
- checkpoint_id=checkpoint_id,
- ext=ext,
- )
- os_path = self._get_os_path(path=parent)
- cp_dir = os.path.join(os_path, self.checkpoint_dir)
- with self.perm_to_403():
- ensure_dir_exists(cp_dir)
- cp_path = os.path.join(cp_dir, filename)
- return cp_path
-
- def checkpoint_model(self, checkpoint_id, os_path):
- """construct the info dict for a given checkpoint"""
- stats = os.stat(os_path)
- last_modified = tz.utcfromtimestamp(stats.st_mtime)
- info = dict(
- id=checkpoint_id,
- last_modified=last_modified,
- )
- return info
-
- def create_file_checkpoint(self, content, format, path):
- """Create a checkpoint from the current content of a notebook."""
- path = path.strip('/')
- # only the one checkpoint ID:
- checkpoint_id = u"checkpoint"
- os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
- self.log.debug("creating checkpoint for %s", path)
- with self.perm_to_403():
- self._save_file(os_checkpoint_path, content, format=format)
-
- # return the checkpoint info
- return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
-
- def create_notebook_checkpoint(self, nb, path):
- """Create a checkpoint from the current content of a notebook."""
- path = path.strip('/')
- # only the one checkpoint ID:
- checkpoint_id = u"checkpoint"
- os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
- self.log.debug("creating checkpoint for %s", path)
- with self.perm_to_403():
- self._save_notebook(os_checkpoint_path, nb)
-
- # return the checkpoint info
- return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
-
- def get_checkpoint(self, checkpoint_id, path, type):
- """Get the content of a checkpoint.
-
- Returns a model suitable for passing to ContentsManager.save.
- """
- path = path.strip('/')
- self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
- os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
- if not os.path.isfile(os_checkpoint_path):
- self.no_such_checkpoint(path, checkpoint_id)
-
- if type == 'notebook':
- return {
- 'type': type,
- 'content': self._read_notebook(
- os_checkpoint_path,
- as_version=4,
- ),
- }
- elif type == 'file':
- content, format = self._read_file(os_checkpoint_path, format=None)
- return {
- 'type': type,
- 'content': content,
- 'format': format,
- }
- else:
- raise web.HTTPError(
- 500,
- u'Unexpected type %s' % type
- )
-
- # Error Handling
- def no_such_checkpoint(self, path, checkpoint_id):
- raise web.HTTPError(
- 404,
- u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
- )
-
-
class FileContentsManager(FileManagerMixin, ContentsManager):
root_dir = Unicode(config=True)
@@ -468,9 +124,6 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
def _checkpoint_manager_class_default(self):
return FileCheckpointManager
- def _backend_default(self):
- return 'local_file'
-
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?
diff --git a/IPython/html/services/contents/manager.py b/IPython/html/services/contents/manager.py
index 6b25dc1..6d2f229 100644
--- a/IPython/html/services/contents/manager.py
+++ b/IPython/html/services/contents/manager.py
@@ -11,6 +11,7 @@ import re
from tornado.web import HTTPError
+from .checkpoints import CheckpointManager
from IPython.config.configurable import LoggingConfigurable
from IPython.nbformat import sign, validate, ValidationError
from IPython.nbformat.v4 import new_notebook
@@ -29,77 +30,6 @@ from IPython.utils.py3compat import string_types
copy_pat = re.compile(r'\-Copy\d*\.')
-class CheckpointManager(LoggingConfigurable):
- """
- Base class for managing checkpoints for a ContentsManager.
- """
-
- def create_checkpoint(self, contents_mgr, path):
- model = contents_mgr.get(path, content=True)
- type = model['type']
- if type == 'notebook':
- return self.create_notebook_checkpoint(
- model['content'],
- path,
- )
- elif type == 'file':
- return self.create_file_checkpoint(
- model['content'],
- model['format'],
- path,
- )
-
- def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
- """Restore a checkpoint."""
- type = contents_mgr.get(path, content=False)['type']
- model = self.get_checkpoint(checkpoint_id, path, type)
- contents_mgr.save(model, path)
-
- def create_file_checkpoint(self, content, format, path):
- """Create a checkpoint of the current state of a file
-
- Returns a checkpoint model for the new checkpoint.
- """
- raise NotImplementedError("must be implemented in a subclass")
-
- def create_notebook_checkpoint(self, nb, path):
- """Create a checkpoint of the current state of a file
-
- Returns a checkpoint model for the new checkpoint.
- """
- raise NotImplementedError("must be implemented in a subclass")
-
- def get_checkpoint(self, checkpoint_id, path, type):
- """Get the content of a checkpoint.
-
- Returns an unvalidated model with the same structure as
- the return value of ContentsManager.get
- """
- raise NotImplementedError("must be implemented in a subclass")
-
- def rename_checkpoint(self, checkpoint_id, old_path, new_path):
- """Rename a single checkpoint from old_path to new_path."""
- raise NotImplementedError("must be implemented in a subclass")
-
- def delete_checkpoint(self, checkpoint_id, path):
- """delete a checkpoint for a file"""
- raise NotImplementedError("must be implemented in a subclass")
-
- def list_checkpoints(self, path):
- """Return a list of checkpoints for a given file"""
- raise NotImplementedError("must be implemented in a subclass")
-
- def rename_all_checkpoints(self, old_path, new_path):
- """Rename all checkpoints for old_path to new_path."""
- for cp in self.list_checkpoints(old_path):
- self.rename_checkpoint(cp['id'], old_path, new_path)
-
- def delete_all_checkpoints(self, path):
- """Delete all checkpoints for the given path."""
- for checkpoint in self.list_checkpoints(path):
- self.delete_checkpoint(checkpoint['id'], path)
-
-
class ContentsManager(LoggingConfigurable):
"""Base class for serving files and directories.
@@ -180,7 +110,6 @@ class ContentsManager(LoggingConfigurable):
checkpoint_manager_class = Type(CheckpointManager, config=True)
checkpoint_manager = Instance(CheckpointManager, config=True)
checkpoint_manager_kwargs = Dict(allow_none=False, config=True)
- backend = Unicode(default_value="")
def _checkpoint_manager_default(self):
return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs)
diff --git a/IPython/html/services/contents/tests/test_contents_api.py b/IPython/html/services/contents/tests/test_contents_api.py
index b19a8fa..d45f83d 100644
--- a/IPython/html/services/contents/tests/test_contents_api.py
+++ b/IPython/html/services/contents/tests/test_contents_api.py
@@ -13,6 +13,9 @@ pjoin = os.path.join
import requests
+from ..filecheckpoints import GenericFileCheckpointManager
+
+from IPython.config import Config
from IPython.html.utils import url_path_join, url_escape, to_os_path
from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
from IPython.nbformat import read, write, from_dict
@@ -615,24 +618,12 @@ class APITest(NotebookTestBase):
with self.patch_cp_root(td):
self.test_file_checkpoints()
- @contextmanager
- def patch_cm_backend(self):
- """
- Temporarily patch our ContentsManager to present a different backend.
- """
- mgr = self.notebook.contents_manager
- old_backend = mgr.backend
- mgr.backend = ""
- try:
- yield
- finally:
- mgr.backend = old_backend
-
- def test_checkpoints_empty_backend(self):
- with self.patch_cm_backend():
- self.test_checkpoints()
-
- with self.patch_cm_backend():
- self.test_file_checkpoints()
+class GenericFileCheckpointsAPITest(APITest):
+ """
+ Run the tests from APITest with GenericFileCheckpointManager.
+ """
+ config = Config()
+ config.FileContentsManager.checkpoint_manager_class = \
+ GenericFileCheckpointManager