From 4cce9bcf15119477b7b504187fdd62db0cc39752 2015-01-08 19:07:17 From: Scott Sanderson Date: 2015-01-08 19:07:17 Subject: [PATCH] DEV: Separate FileCheckpointManager and GenericFileCheckpointManager. - Adds a `GenericCheckpointMixin` as a helper for implementing the two boundary-traversing Checkpoint API methods, `create_checkpoint` and `restore_checkpoint`. - `GenericFileCheckpointManager` is implemented as a subclass of `FileCheckpointManager` using `GenericCheckpointMixin`. Note that this is the safe subtyping relationship because of method signature *contra*variance: `FileCheckpointManager` accepts `FileContentsManager` in its method signatures type, whereas `GenericFileCheckpointManager` accepts any `ContentsManager`. - Moved Checkpoint-related classes to their own files. --- diff --git a/IPython/html/services/contents/checkpoints.py b/IPython/html/services/contents/checkpoints.py new file mode 100644 index 0000000..0ba546f --- /dev/null +++ b/IPython/html/services/contents/checkpoints.py @@ -0,0 +1,112 @@ +""" +Classes for managing Checkpoints. +""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +from IPython.config.configurable import LoggingConfigurable + + +class CheckpointManager(LoggingConfigurable): + """ + Base class for managing checkpoints for a ContentsManager. + + Subclasses are required to implement: + + create_checkpoint(self, contents_mgr, path) + restore_checkpoint(self, contents_mgr, checkpoint_id, path) + rename_checkpoint(self, checkpoint_id, old_path, new_path) + delete_checkpoint(self, checkpoint_id, path) + list_checkpoints(self, path) + """ + def create_checkpoint(self, contents_mgr, path): + """Create a checkpoint.""" + raise NotImplementedError("must be implemented in a subclass") + + def restore_checkpoint(self, contents_mgr, checkpoint_id, path): + """Restore a checkpoint""" + raise NotImplementedError("must be implemented in a subclass") + + def rename_checkpoint(self, checkpoint_id, old_path, new_path): + """Rename a single checkpoint from old_path to new_path.""" + raise NotImplementedError("must be implemented in a subclass") + + def delete_checkpoint(self, checkpoint_id, path): + """delete a checkpoint for a file""" + raise NotImplementedError("must be implemented in a subclass") + + def list_checkpoints(self, path): + """Return a list of checkpoints for a given file""" + raise NotImplementedError("must be implemented in a subclass") + + def rename_all_checkpoints(self, old_path, new_path): + """Rename all checkpoints for old_path to new_path.""" + for cp in self.list_checkpoints(old_path): + self.rename_checkpoint(cp['id'], old_path, new_path) + + def delete_all_checkpoints(self, path): + """Delete all checkpoints for the given path.""" + for checkpoint in self.list_checkpoints(path): + self.delete_checkpoint(checkpoint['id'], path) + + +class GenericCheckpointMixin(object): + """ + Helper for creating CheckpointManagers that can be used with any + ContentsManager. + + Provides an implementation of `create_checkpoint` and `restore_checkpoint` + in terms of the following operations: + + create_file_checkpoint(self, content, format, path) + create_notebook_checkpoint(self, nb, path) + get_checkpoint(self, checkpoint_id, path, type) + + **Any** valid CheckpointManager implementation should also be valid when + this mixin is applied. + """ + + def create_checkpoint(self, contents_mgr, path): + model = contents_mgr.get(path, content=True) + type = model['type'] + if type == 'notebook': + return self.create_notebook_checkpoint( + model['content'], + path, + ) + elif type == 'file': + return self.create_file_checkpoint( + model['content'], + model['format'], + path, + ) + + def restore_checkpoint(self, contents_mgr, checkpoint_id, path): + """Restore a checkpoint.""" + type = contents_mgr.get(path, content=False)['type'] + model = self.get_checkpoint(checkpoint_id, path, type) + contents_mgr.save(model, path) + + # Required Methods + def create_file_checkpoint(self, content, format, path): + """Create a checkpoint of the current state of a file + + Returns a checkpoint model for the new checkpoint. + """ + raise NotImplementedError("must be implemented in a subclass") + + def create_notebook_checkpoint(self, nb, path): + """Create a checkpoint of the current state of a file + + Returns a checkpoint model for the new checkpoint. + """ + raise NotImplementedError("must be implemented in a subclass") + + def get_checkpoint(self, checkpoint_id, path, type): + """Get the content of a checkpoint. + + Returns an unvalidated model with the same structure as + the return value of ContentsManager.get + """ + raise NotImplementedError("must be implemented in a subclass") diff --git a/IPython/html/services/contents/filecheckpoints.py b/IPython/html/services/contents/filecheckpoints.py new file mode 100644 index 0000000..e1e4f8c --- /dev/null +++ b/IPython/html/services/contents/filecheckpoints.py @@ -0,0 +1,198 @@ +""" +File-based CheckpointManagers. +""" +import os +import shutil + +from tornado.web import HTTPError + +from .checkpoints import ( + CheckpointManager, + GenericCheckpointMixin, +) +from .fileio import FileManagerMixin + +from IPython.utils import tz +from IPython.utils.path import ensure_dir_exists +from IPython.utils.py3compat import getcwd +from IPython.utils.traitlets import Unicode + + +class FileCheckpointManager(FileManagerMixin, CheckpointManager): + """ + A CheckpointManager that caches checkpoints for files in adjacent + directories. + + Only works with FileContentsManager. Use GenericFileCheckpointManager if + you want file-based checkpoints with another ContentsManager. + """ + + checkpoint_dir = Unicode( + '.ipynb_checkpoints', + config=True, + help="""The directory name in which to keep file checkpoints + + This is a path relative to the file's own directory. + + By default, it is .ipynb_checkpoints + """, + ) + + root_dir = Unicode(config=True) + + def _root_dir_default(self): + try: + return self.parent.root_dir + except AttributeError: + return getcwd() + + # ContentsManager-dependent checkpoint API + def create_checkpoint(self, contents_mgr, path): + """Create a checkpoint.""" + checkpoint_id = u'checkpoint' + src_path = contents_mgr._get_os_path(path) + dest_path = self.checkpoint_path(checkpoint_id, path) + self._copy(src_path, dest_path) + return self.checkpoint_model(checkpoint_id, dest_path) + + def restore_checkpoint(self, contents_mgr, checkpoint_id, path): + """Restore a checkpoint.""" + src_path = self.checkpoint_path(checkpoint_id, path) + dest_path = contents_mgr._get_os_path(path) + self._copy(src_path, dest_path) + + # ContentsManager-independent checkpoint API + def rename_checkpoint(self, checkpoint_id, old_path, new_path): + """Rename a checkpoint from old_path to new_path.""" + old_cp_path = self.checkpoint_path(checkpoint_id, old_path) + new_cp_path = self.checkpoint_path(checkpoint_id, new_path) + if os.path.isfile(old_cp_path): + self.log.debug( + "Renaming checkpoint %s -> %s", + old_cp_path, + new_cp_path, + ) + with self.perm_to_403(): + shutil.move(old_cp_path, new_cp_path) + + def delete_checkpoint(self, checkpoint_id, path): + """delete a file's checkpoint""" + path = path.strip('/') + cp_path = self.checkpoint_path(checkpoint_id, path) + if not os.path.isfile(cp_path): + self.no_such_checkpoint(path, checkpoint_id) + + self.log.debug("unlinking %s", cp_path) + with self.perm_to_403(): + os.unlink(cp_path) + + def list_checkpoints(self, path): + """list the checkpoints for a given file + + This contents manager currently only supports one checkpoint per file. + """ + path = path.strip('/') + checkpoint_id = "checkpoint" + os_path = self.checkpoint_path(checkpoint_id, path) + if not os.path.isfile(os_path): + return [] + else: + return [self.checkpoint_model(checkpoint_id, os_path)] + + # Checkpoint-related utilities + def checkpoint_path(self, checkpoint_id, path): + """find the path to a checkpoint""" + path = path.strip('/') + parent, name = ('/' + path).rsplit('/', 1) + parent = parent.strip('/') + basename, ext = os.path.splitext(name) + filename = u"{name}-{checkpoint_id}{ext}".format( + name=basename, + checkpoint_id=checkpoint_id, + ext=ext, + ) + os_path = self._get_os_path(path=parent) + cp_dir = os.path.join(os_path, self.checkpoint_dir) + with self.perm_to_403(): + ensure_dir_exists(cp_dir) + cp_path = os.path.join(cp_dir, filename) + return cp_path + + def checkpoint_model(self, checkpoint_id, os_path): + """construct the info dict for a given checkpoint""" + stats = os.stat(os_path) + last_modified = tz.utcfromtimestamp(stats.st_mtime) + info = dict( + id=checkpoint_id, + last_modified=last_modified, + ) + return info + + # Error Handling + def no_such_checkpoint(self, path, checkpoint_id): + raise HTTPError( + 404, + u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id) + ) + + +class GenericFileCheckpointManager(GenericCheckpointMixin, + FileCheckpointManager): + """ + Local filesystem CheckpointManager that works with any conforming + ContentsManager. + """ + def create_file_checkpoint(self, content, format, path): + """Create a checkpoint from the current content of a notebook.""" + path = path.strip('/') + # only the one checkpoint ID: + checkpoint_id = u"checkpoint" + os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) + self.log.debug("creating checkpoint for %s", path) + with self.perm_to_403(): + self._save_file(os_checkpoint_path, content, format=format) + + # return the checkpoint info + return self.checkpoint_model(checkpoint_id, os_checkpoint_path) + + def create_notebook_checkpoint(self, nb, path): + """Create a checkpoint from the current content of a notebook.""" + path = path.strip('/') + # only the one checkpoint ID: + checkpoint_id = u"checkpoint" + os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) + self.log.debug("creating checkpoint for %s", path) + with self.perm_to_403(): + self._save_notebook(os_checkpoint_path, nb) + + # return the checkpoint info + return self.checkpoint_model(checkpoint_id, os_checkpoint_path) + + def get_checkpoint(self, checkpoint_id, path, type): + """Get the content of a checkpoint. + + Returns a model suitable for passing to ContentsManager.save. + """ + path = path.strip('/') + self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) + os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) + if not os.path.isfile(os_checkpoint_path): + self.no_such_checkpoint(path, checkpoint_id) + + if type == 'notebook': + return { + 'type': type, + 'content': self._read_notebook( + os_checkpoint_path, + as_version=4, + ), + } + elif type == 'file': + content, format = self._read_file(os_checkpoint_path, format=None) + return { + 'type': type, + 'content': content, + 'format': format, + } + else: + raise HTTPError(500, u'Unexpected type %s' % type) diff --git a/IPython/html/services/contents/fileio.py b/IPython/html/services/contents/fileio.py new file mode 100644 index 0000000..3b646aa --- /dev/null +++ b/IPython/html/services/contents/fileio.py @@ -0,0 +1,166 @@ +""" +Utilities for file-based Contents/Checkpoints managers. +""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +import base64 +from contextlib import contextmanager +import errno +import io +import os +import shutil + +from tornado.web import HTTPError + +from IPython.html.utils import ( + to_api_path, + to_os_path, +) +from IPython import nbformat +from IPython.utils.io import atomic_writing +from IPython.utils.py3compat import str_to_unicode + + +class FileManagerMixin(object): + """ + Mixin for ContentsAPI classes that interact with the filesystem. + + Provides facilities for reading, writing, and copying both notebooks and + generic files. + + Shared by FileContentsManager and FileCheckpointManager. + + Note + ---- + Classes using this mixin must provide the following attributes: + + root_dir : unicode + A directory against against which API-style paths are to be resolved. + + log : logging.Logger + """ + + @contextmanager + def open(self, os_path, *args, **kwargs): + """wrapper around io.open that turns permission errors into 403""" + with self.perm_to_403(os_path): + with io.open(os_path, *args, **kwargs) as f: + yield f + + @contextmanager + def atomic_writing(self, os_path, *args, **kwargs): + """wrapper around atomic_writing that turns permission errors to 403""" + with self.perm_to_403(os_path): + with atomic_writing(os_path, *args, **kwargs) as f: + yield f + + @contextmanager + def perm_to_403(self, os_path=''): + """context manager for turning permission errors into 403.""" + try: + yield + except OSError as e: + if e.errno in {errno.EPERM, errno.EACCES}: + # make 403 error message without root prefix + # this may not work perfectly on unicode paths on Python 2, + # but nobody should be doing that anyway. + if not os_path: + os_path = str_to_unicode(e.filename or 'unknown file') + path = to_api_path(os_path, root=self.root_dir) + raise HTTPError(403, u'Permission denied: %s' % path) + else: + raise + + def _copy(self, src, dest): + """copy src to dest + + like shutil.copy2, but log errors in copystat + """ + shutil.copyfile(src, dest) + try: + shutil.copystat(src, dest) + except OSError: + self.log.debug("copystat on %s failed", dest, exc_info=True) + + def _get_os_path(self, path): + """Given an API path, return its file system path. + + Parameters + ---------- + path : string + The relative API path to the named file. + + Returns + ------- + path : string + Native, absolute OS path to for a file. + """ + return to_os_path(path, self.root_dir) + + def _read_notebook(self, os_path, as_version=4): + """Read a notebook from an os path.""" + with self.open(os_path, 'r', encoding='utf-8') as f: + try: + return nbformat.read(f, as_version=as_version) + except Exception as e: + raise HTTPError( + 400, + u"Unreadable Notebook: %s %r" % (os_path, e), + ) + + def _save_notebook(self, os_path, nb): + """Save a notebook to an os_path.""" + with self.atomic_writing(os_path, encoding='utf-8') as f: + nbformat.write(nb, f, version=nbformat.NO_CONVERT) + + def _read_file(self, os_path, format): + """Read a non-notebook file. + + os_path: The path to be read. + format: + If 'text', the contents will be decoded as UTF-8. + If 'base64', the raw bytes contents will be encoded as base64. + If not specified, try to decode as UTF-8, and fall back to base64 + """ + if not os.path.isfile(os_path): + raise HTTPError(400, "Cannot read non-file %s" % os_path) + + with self.open(os_path, 'rb') as f: + bcontent = f.read() + + if format is None or format == 'text': + # Try to interpret as unicode if format is unknown or if unicode + # was explicitly requested. + try: + return bcontent.decode('utf8'), 'text' + except UnicodeError: + if format == 'text': + raise HTTPError( + 400, + "%s is not UTF-8 encoded" % os_path, + reason='bad format', + ) + return base64.encodestring(bcontent).decode('ascii'), 'base64' + + def _save_file(self, os_path, content, format): + """Save content of a generic file.""" + if format not in {'text', 'base64'}: + raise HTTPError( + 400, + "Must specify format of file contents as 'text' or 'base64'", + ) + try: + if format == 'text': + bcontent = content.encode('utf8') + else: + b64_bytes = content.encode('ascii') + bcontent = base64.decodestring(b64_bytes) + except Exception as e: + raise HTTPError( + 400, u'Encoding error saving %s: %s' % (os_path, e) + ) + + with self.atomic_writing(os_path, text=False) as f: + f.write(bcontent) diff --git a/IPython/html/services/contents/filemanager.py b/IPython/html/services/contents/filemanager.py index 81b64b2..561c4a7 100644 --- a/IPython/html/services/contents/filemanager.py +++ b/IPython/html/services/contents/filemanager.py @@ -3,9 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. -import base64 -from contextlib import contextmanager -import errno + import io import os import shutil @@ -13,25 +11,23 @@ import mimetypes from tornado import web -from .manager import ( - CheckpointManager, - ContentsManager, -) +from .filecheckpoints import FileCheckpointManager +from .fileio import FileManagerMixin +from .manager import ContentsManager + from IPython import nbformat -from IPython.utils.io import atomic_writing from IPython.utils.importstring import import_item -from IPython.utils.path import ensure_dir_exists from IPython.utils.traitlets import Any, Unicode, Bool, TraitError -from IPython.utils.py3compat import getcwd, string_types, str_to_unicode +from IPython.utils.py3compat import getcwd, string_types from IPython.utils import tz from IPython.html.utils import ( is_hidden, to_api_path, - to_os_path, ) _script_exporter = None + def _post_save_script(model, os_path, contents_manager, **kwargs): """convert notebooks to Python script after save with nbconvert @@ -56,346 +52,6 @@ def _post_save_script(model, os_path, contents_manager, **kwargs): f.write(script) -class FileManagerMixin(object): - """ - Mixin for ContentsAPI classes that interact with the filesystem. - - Provides facilities for reading, writing, and copying both notebooks and - generic files. - - Shared by FileContentsManager and FileCheckpointManager. - - Note - ---- - Classes using this mixin must provide the following attributes: - - root_dir : unicode - A directory against against which API-style paths are to be resolved. - - log : logging.Logger - """ - - @contextmanager - def open(self, os_path, *args, **kwargs): - """wrapper around io.open that turns permission errors into 403""" - with self.perm_to_403(os_path): - with io.open(os_path, *args, **kwargs) as f: - yield f - - @contextmanager - def atomic_writing(self, os_path, *args, **kwargs): - """wrapper around atomic_writing that turns permission errors into 403""" - with self.perm_to_403(os_path): - with atomic_writing(os_path, *args, **kwargs) as f: - yield f - - @contextmanager - def perm_to_403(self, os_path=''): - """context manager for turning permission errors into 403.""" - try: - yield - except OSError as e: - if e.errno in {errno.EPERM, errno.EACCES}: - # make 403 error message without root prefix - # this may not work perfectly on unicode paths on Python 2, - # but nobody should be doing that anyway. - if not os_path: - os_path = str_to_unicode(e.filename or 'unknown file') - path = to_api_path(os_path, root=self.root_dir) - raise web.HTTPError(403, u'Permission denied: %s' % path) - else: - raise - - def _copy(self, src, dest): - """copy src to dest - - like shutil.copy2, but log errors in copystat - """ - shutil.copyfile(src, dest) - try: - shutil.copystat(src, dest) - except OSError: - self.log.debug("copystat on %s failed", dest, exc_info=True) - - def _get_os_path(self, path): - """Given an API path, return its file system path. - - Parameters - ---------- - path : string - The relative API path to the named file. - - Returns - ------- - path : string - Native, absolute OS path to for a file. - """ - return to_os_path(path, self.root_dir) - - def _read_notebook(self, os_path, as_version=4): - """Read a notebook from an os path.""" - with self.open(os_path, 'r', encoding='utf-8') as f: - try: - return nbformat.read(f, as_version=as_version) - except Exception as e: - raise web.HTTPError( - 400, - u"Unreadable Notebook: %s %r" % (os_path, e), - ) - - def _save_notebook(self, os_path, nb): - """Save a notebook to an os_path.""" - with self.atomic_writing(os_path, encoding='utf-8') as f: - nbformat.write(nb, f, version=nbformat.NO_CONVERT) - - def _read_file(self, os_path, format): - """Read a non-notebook file. - - os_path: The path to be read. - format: - If 'text', the contents will be decoded as UTF-8. - If 'base64', the raw bytes contents will be encoded as base64. - If not specified, try to decode as UTF-8, and fall back to base64 - """ - if not os.path.isfile(os_path): - raise web.HTTPError(400, "Cannot read non-file %s" % os_path) - - with self.open(os_path, 'rb') as f: - bcontent = f.read() - - if format is None or format == 'text': - # Try to interpret as unicode if format is unknown or if unicode - # was explicitly requested. - try: - return bcontent.decode('utf8'), 'text' - except UnicodeError as e: - if format == 'text': - raise web.HTTPError( - 400, - "%s is not UTF-8 encoded" % os_path, - reason='bad format', - ) - return base64.encodestring(bcontent).decode('ascii'), 'base64' - - def _save_file(self, os_path, content, format): - """Save content of a generic file.""" - if format not in {'text', 'base64'}: - raise web.HTTPError( - 400, - "Must specify format of file contents as 'text' or 'base64'", - ) - try: - if format == 'text': - bcontent = content.encode('utf8') - else: - b64_bytes = content.encode('ascii') - bcontent = base64.decodestring(b64_bytes) - except Exception as e: - raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e)) - - with self.atomic_writing(os_path, text=False) as f: - f.write(bcontent) - - -class FileCheckpointManager(FileManagerMixin, CheckpointManager): - """ - A CheckpointManager that caches checkpoints for files in adjacent - directories. - """ - - checkpoint_dir = Unicode( - '.ipynb_checkpoints', - config=True, - help="""The directory name in which to keep file checkpoints - - This is a path relative to the file's own directory. - - By default, it is .ipynb_checkpoints - """, - ) - - root_dir = Unicode(config=True) - - def _root_dir_default(self): - try: - return self.parent.root_dir - except AttributeError: - return getcwd() - - # ContentsManager-dependent checkpoint API - def create_checkpoint(self, contents_mgr, path): - """ - Create a checkpoint. - - If contents_mgr is backed by the local filesystem, just copy the - appropriate file to the checkpoint directory. Otherwise, ask the - ContentsManager for a model and write it ourselves. - """ - if contents_mgr.backend == 'local_file': - # We know that the file is in the local filesystem, so just copy - # from the base location to our location. - checkpoint_id = u'checkpoint' - src_path = contents_mgr._get_os_path(path) - dest_path = self.checkpoint_path(checkpoint_id, path) - self._copy(src_path, dest_path) - return self.checkpoint_model(checkpoint_id, dest_path) - else: - return super(FileCheckpointManager, self).create_checkpoint( - contents_mgr, path, - ) - - def restore_checkpoint(self, contents_mgr, checkpoint_id, path): - """ - Restore a checkpoint. - - If contents_mgr is backed by the local filesystem, just copy the - appropriate file from the checkpoint directory. Otherwise, load the - model and pass it to ContentsManager.save. - """ - if contents_mgr.backend == 'local_file': - # We know that the file is in the local filesystem, so just copy - # from our base location to the location expected by content - src_path = self.checkpoint_path(checkpoint_id, path) - dest_path = contents_mgr._get_os_path(path) - self._copy(src_path, dest_path) - else: - super(FileCheckpointManager, self).restore_checkpoint( - contents_mgr, checkpoint_id, path - ) - - # ContentsManager-independent checkpoint API - def rename_checkpoint(self, checkpoint_id, old_path, new_path): - """Rename a checkpoint from old_path to new_path.""" - old_cp_path = self.checkpoint_path(checkpoint_id, old_path) - new_cp_path = self.checkpoint_path(checkpoint_id, new_path) - if os.path.isfile(old_cp_path): - self.log.debug( - "Renaming checkpoint %s -> %s", - old_cp_path, - new_cp_path, - ) - with self.perm_to_403(): - shutil.move(old_cp_path, new_cp_path) - - def delete_checkpoint(self, checkpoint_id, path): - """delete a file's checkpoint""" - path = path.strip('/') - cp_path = self.checkpoint_path(checkpoint_id, path) - if not os.path.isfile(cp_path): - self.no_such_checkpoint(path, checkpoint_id) - - self.log.debug("unlinking %s", cp_path) - with self.perm_to_403(): - os.unlink(cp_path) - - def list_checkpoints(self, path): - """list the checkpoints for a given file - - This contents manager currently only supports one checkpoint per file. - """ - path = path.strip('/') - checkpoint_id = "checkpoint" - os_path = self.checkpoint_path(checkpoint_id, path) - if not os.path.isfile(os_path): - return [] - else: - return [self.checkpoint_model(checkpoint_id, os_path)] - - # Checkpoint-related utilities - def checkpoint_path(self, checkpoint_id, path): - """find the path to a checkpoint""" - path = path.strip('/') - parent, name = ('/' + path).rsplit('/', 1) - parent = parent.strip('/') - basename, ext = os.path.splitext(name) - filename = u"{name}-{checkpoint_id}{ext}".format( - name=basename, - checkpoint_id=checkpoint_id, - ext=ext, - ) - os_path = self._get_os_path(path=parent) - cp_dir = os.path.join(os_path, self.checkpoint_dir) - with self.perm_to_403(): - ensure_dir_exists(cp_dir) - cp_path = os.path.join(cp_dir, filename) - return cp_path - - def checkpoint_model(self, checkpoint_id, os_path): - """construct the info dict for a given checkpoint""" - stats = os.stat(os_path) - last_modified = tz.utcfromtimestamp(stats.st_mtime) - info = dict( - id=checkpoint_id, - last_modified=last_modified, - ) - return info - - def create_file_checkpoint(self, content, format, path): - """Create a checkpoint from the current content of a notebook.""" - path = path.strip('/') - # only the one checkpoint ID: - checkpoint_id = u"checkpoint" - os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) - self.log.debug("creating checkpoint for %s", path) - with self.perm_to_403(): - self._save_file(os_checkpoint_path, content, format=format) - - # return the checkpoint info - return self.checkpoint_model(checkpoint_id, os_checkpoint_path) - - def create_notebook_checkpoint(self, nb, path): - """Create a checkpoint from the current content of a notebook.""" - path = path.strip('/') - # only the one checkpoint ID: - checkpoint_id = u"checkpoint" - os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) - self.log.debug("creating checkpoint for %s", path) - with self.perm_to_403(): - self._save_notebook(os_checkpoint_path, nb) - - # return the checkpoint info - return self.checkpoint_model(checkpoint_id, os_checkpoint_path) - - def get_checkpoint(self, checkpoint_id, path, type): - """Get the content of a checkpoint. - - Returns a model suitable for passing to ContentsManager.save. - """ - path = path.strip('/') - self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) - os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) - if not os.path.isfile(os_checkpoint_path): - self.no_such_checkpoint(path, checkpoint_id) - - if type == 'notebook': - return { - 'type': type, - 'content': self._read_notebook( - os_checkpoint_path, - as_version=4, - ), - } - elif type == 'file': - content, format = self._read_file(os_checkpoint_path, format=None) - return { - 'type': type, - 'content': content, - 'format': format, - } - else: - raise web.HTTPError( - 500, - u'Unexpected type %s' % type - ) - - # Error Handling - def no_such_checkpoint(self, path, checkpoint_id): - raise web.HTTPError( - 404, - u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id) - ) - - class FileContentsManager(FileManagerMixin, ContentsManager): root_dir = Unicode(config=True) @@ -468,9 +124,6 @@ class FileContentsManager(FileManagerMixin, ContentsManager): def _checkpoint_manager_class_default(self): return FileCheckpointManager - def _backend_default(self): - return 'local_file' - def is_hidden(self, path): """Does the API style path correspond to a hidden directory or file? diff --git a/IPython/html/services/contents/manager.py b/IPython/html/services/contents/manager.py index 6b25dc1..6d2f229 100644 --- a/IPython/html/services/contents/manager.py +++ b/IPython/html/services/contents/manager.py @@ -11,6 +11,7 @@ import re from tornado.web import HTTPError +from .checkpoints import CheckpointManager from IPython.config.configurable import LoggingConfigurable from IPython.nbformat import sign, validate, ValidationError from IPython.nbformat.v4 import new_notebook @@ -29,77 +30,6 @@ from IPython.utils.py3compat import string_types copy_pat = re.compile(r'\-Copy\d*\.') -class CheckpointManager(LoggingConfigurable): - """ - Base class for managing checkpoints for a ContentsManager. - """ - - def create_checkpoint(self, contents_mgr, path): - model = contents_mgr.get(path, content=True) - type = model['type'] - if type == 'notebook': - return self.create_notebook_checkpoint( - model['content'], - path, - ) - elif type == 'file': - return self.create_file_checkpoint( - model['content'], - model['format'], - path, - ) - - def restore_checkpoint(self, contents_mgr, checkpoint_id, path): - """Restore a checkpoint.""" - type = contents_mgr.get(path, content=False)['type'] - model = self.get_checkpoint(checkpoint_id, path, type) - contents_mgr.save(model, path) - - def create_file_checkpoint(self, content, format, path): - """Create a checkpoint of the current state of a file - - Returns a checkpoint model for the new checkpoint. - """ - raise NotImplementedError("must be implemented in a subclass") - - def create_notebook_checkpoint(self, nb, path): - """Create a checkpoint of the current state of a file - - Returns a checkpoint model for the new checkpoint. - """ - raise NotImplementedError("must be implemented in a subclass") - - def get_checkpoint(self, checkpoint_id, path, type): - """Get the content of a checkpoint. - - Returns an unvalidated model with the same structure as - the return value of ContentsManager.get - """ - raise NotImplementedError("must be implemented in a subclass") - - def rename_checkpoint(self, checkpoint_id, old_path, new_path): - """Rename a single checkpoint from old_path to new_path.""" - raise NotImplementedError("must be implemented in a subclass") - - def delete_checkpoint(self, checkpoint_id, path): - """delete a checkpoint for a file""" - raise NotImplementedError("must be implemented in a subclass") - - def list_checkpoints(self, path): - """Return a list of checkpoints for a given file""" - raise NotImplementedError("must be implemented in a subclass") - - def rename_all_checkpoints(self, old_path, new_path): - """Rename all checkpoints for old_path to new_path.""" - for cp in self.list_checkpoints(old_path): - self.rename_checkpoint(cp['id'], old_path, new_path) - - def delete_all_checkpoints(self, path): - """Delete all checkpoints for the given path.""" - for checkpoint in self.list_checkpoints(path): - self.delete_checkpoint(checkpoint['id'], path) - - class ContentsManager(LoggingConfigurable): """Base class for serving files and directories. @@ -180,7 +110,6 @@ class ContentsManager(LoggingConfigurable): checkpoint_manager_class = Type(CheckpointManager, config=True) checkpoint_manager = Instance(CheckpointManager, config=True) checkpoint_manager_kwargs = Dict(allow_none=False, config=True) - backend = Unicode(default_value="") def _checkpoint_manager_default(self): return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs) diff --git a/IPython/html/services/contents/tests/test_contents_api.py b/IPython/html/services/contents/tests/test_contents_api.py index b19a8fa..d45f83d 100644 --- a/IPython/html/services/contents/tests/test_contents_api.py +++ b/IPython/html/services/contents/tests/test_contents_api.py @@ -13,6 +13,9 @@ pjoin = os.path.join import requests +from ..filecheckpoints import GenericFileCheckpointManager + +from IPython.config import Config from IPython.html.utils import url_path_join, url_escape, to_os_path from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error from IPython.nbformat import read, write, from_dict @@ -615,24 +618,12 @@ class APITest(NotebookTestBase): with self.patch_cp_root(td): self.test_file_checkpoints() - @contextmanager - def patch_cm_backend(self): - """ - Temporarily patch our ContentsManager to present a different backend. - """ - mgr = self.notebook.contents_manager - old_backend = mgr.backend - mgr.backend = "" - try: - yield - finally: - mgr.backend = old_backend - - def test_checkpoints_empty_backend(self): - with self.patch_cm_backend(): - self.test_checkpoints() - - with self.patch_cm_backend(): - self.test_file_checkpoints() +class GenericFileCheckpointsAPITest(APITest): + """ + Run the tests from APITest with GenericFileCheckpointManager. + """ + config = Config() + config.FileContentsManager.checkpoint_manager_class = \ + GenericFileCheckpointManager