diff --git a/IPython/html/services/contents/filemanager.py b/IPython/html/services/contents/filemanager.py index 7fe4ee8..81b64b2 100644 --- a/IPython/html/services/contents/filemanager.py +++ b/IPython/html/services/contents/filemanager.py @@ -222,59 +222,48 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager): except AttributeError: return getcwd() - # public checkpoint API - def create_file_checkpoint(self, content, format, path): - """Create a checkpoint from the current content of a notebook.""" - path = path.strip('/') - # only the one checkpoint ID: - checkpoint_id = u"checkpoint" - os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) - self.log.debug("creating checkpoint for %s", path) - with self.perm_to_403(): - self._save_file(os_checkpoint_path, content, format=format) - - # return the checkpoint info - return self.checkpoint_model(checkpoint_id, os_checkpoint_path) - - def create_notebook_checkpoint(self, nb, path): - """Create a checkpoint from the current content of a notebook.""" - path = path.strip('/') - # only the one checkpoint ID: - checkpoint_id = u"checkpoint" - os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) - self.log.debug("creating checkpoint for %s", path) - with self.perm_to_403(): - self._save_notebook(os_checkpoint_path, nb) + # ContentsManager-dependent checkpoint API + def create_checkpoint(self, contents_mgr, path): + """ + Create a checkpoint. - # return the checkpoint info - return self.checkpoint_model(checkpoint_id, os_checkpoint_path) + If contents_mgr is backed by the local filesystem, just copy the + appropriate file to the checkpoint directory. Otherwise, ask the + ContentsManager for a model and write it ourselves. + """ + if contents_mgr.backend == 'local_file': + # We know that the file is in the local filesystem, so just copy + # from the base location to our location. + checkpoint_id = u'checkpoint' + src_path = contents_mgr._get_os_path(path) + dest_path = self.checkpoint_path(checkpoint_id, path) + self._copy(src_path, dest_path) + return self.checkpoint_model(checkpoint_id, dest_path) + else: + return super(FileCheckpointManager, self).create_checkpoint( + contents_mgr, path, + ) - def get_checkpoint(self, checkpoint_id, path, type): - """Get the content of a checkpoint. + def restore_checkpoint(self, contents_mgr, checkpoint_id, path): + """ + Restore a checkpoint. - Returns a model suitable for passing to ContentsManager.save. + If contents_mgr is backed by the local filesystem, just copy the + appropriate file from the checkpoint directory. Otherwise, load the + model and pass it to ContentsManager.save. """ - path = path.strip('/') - self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) - os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) - if not os.path.isfile(os_checkpoint_path): - self.no_such_checkpoint(path, checkpoint_id) - if type == 'notebook': - return { - 'type': type, - 'content': self._read_notebook( - os_checkpoint_path, - as_version=4, - ), - } + if contents_mgr.backend == 'local_file': + # We know that the file is in the local filesystem, so just copy + # from our base location to the location expected by content + src_path = self.checkpoint_path(checkpoint_id, path) + dest_path = contents_mgr._get_os_path(path) + self._copy(src_path, dest_path) else: - content, format = self._read_file(os_checkpoint_path, format=None) - return { - 'type': type, - 'content': content, - 'format': format, - } + super(FileCheckpointManager, self).restore_checkpoint( + contents_mgr, checkpoint_id, path + ) + # ContentsManager-independent checkpoint API def rename_checkpoint(self, checkpoint_id, old_path, new_path): """Rename a checkpoint from old_path to new_path.""" old_cp_path = self.checkpoint_path(checkpoint_id, old_path) @@ -341,6 +330,64 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager): ) return info + def create_file_checkpoint(self, content, format, path): + """Create a checkpoint from the current content of a notebook.""" + path = path.strip('/') + # only the one checkpoint ID: + checkpoint_id = u"checkpoint" + os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) + self.log.debug("creating checkpoint for %s", path) + with self.perm_to_403(): + self._save_file(os_checkpoint_path, content, format=format) + + # return the checkpoint info + return self.checkpoint_model(checkpoint_id, os_checkpoint_path) + + def create_notebook_checkpoint(self, nb, path): + """Create a checkpoint from the current content of a notebook.""" + path = path.strip('/') + # only the one checkpoint ID: + checkpoint_id = u"checkpoint" + os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) + self.log.debug("creating checkpoint for %s", path) + with self.perm_to_403(): + self._save_notebook(os_checkpoint_path, nb) + + # return the checkpoint info + return self.checkpoint_model(checkpoint_id, os_checkpoint_path) + + def get_checkpoint(self, checkpoint_id, path, type): + """Get the content of a checkpoint. + + Returns a model suitable for passing to ContentsManager.save. + """ + path = path.strip('/') + self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) + os_checkpoint_path = self.checkpoint_path(checkpoint_id, path) + if not os.path.isfile(os_checkpoint_path): + self.no_such_checkpoint(path, checkpoint_id) + + if type == 'notebook': + return { + 'type': type, + 'content': self._read_notebook( + os_checkpoint_path, + as_version=4, + ), + } + elif type == 'file': + content, format = self._read_file(os_checkpoint_path, format=None) + return { + 'type': type, + 'content': content, + 'format': format, + } + else: + raise web.HTTPError( + 500, + u'Unexpected type %s' % type + ) + # Error Handling def no_such_checkpoint(self, path, checkpoint_id): raise web.HTTPError( @@ -421,6 +468,9 @@ class FileContentsManager(FileManagerMixin, ContentsManager): def _checkpoint_manager_class_default(self): return FileCheckpointManager + def _backend_default(self): + return 'local_file' + def is_hidden(self, path): """Does the API style path correspond to a hidden directory or file? @@ -681,10 +731,7 @@ class FileContentsManager(FileManagerMixin, ContentsManager): self._save_notebook(os_path, nb) # One checkpoint should always exist for notebooks. if not self.checkpoint_manager.list_checkpoints(path): - self.checkpoint_manager.create_notebook_checkpoint( - nb, - path, - ) + self.create_checkpoint(path) elif model['type'] == 'file': # Missing format will be handled internally by _save_file. self._save_file(os_path, model['content'], model.get('format')) diff --git a/IPython/html/services/contents/manager.py b/IPython/html/services/contents/manager.py index ec2f7db..6b25dc1 100644 --- a/IPython/html/services/contents/manager.py +++ b/IPython/html/services/contents/manager.py @@ -11,7 +11,6 @@ import re from tornado.web import HTTPError -from IPython import nbformat from IPython.config.configurable import LoggingConfigurable from IPython.nbformat import sign, validate, ValidationError from IPython.nbformat.v4 import new_notebook @@ -34,6 +33,28 @@ class CheckpointManager(LoggingConfigurable): """ Base class for managing checkpoints for a ContentsManager. """ + + def create_checkpoint(self, contents_mgr, path): + model = contents_mgr.get(path, content=True) + type = model['type'] + if type == 'notebook': + return self.create_notebook_checkpoint( + model['content'], + path, + ) + elif type == 'file': + return self.create_file_checkpoint( + model['content'], + model['format'], + path, + ) + + def restore_checkpoint(self, contents_mgr, checkpoint_id, path): + """Restore a checkpoint.""" + type = contents_mgr.get(path, content=False)['type'] + model = self.get_checkpoint(checkpoint_id, path, type) + contents_mgr.save(model, path) + def create_file_checkpoint(self, content, format, path): """Create a checkpoint of the current state of a file @@ -159,6 +180,7 @@ class ContentsManager(LoggingConfigurable): checkpoint_manager_class = Type(CheckpointManager, config=True) checkpoint_manager = Instance(CheckpointManager, config=True) checkpoint_manager_kwargs = Dict(allow_none=False, config=True) + backend = Unicode(default_value="") def _checkpoint_manager_default(self): return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs) @@ -502,35 +524,16 @@ class ContentsManager(LoggingConfigurable): # Part 3: Checkpoints API def create_checkpoint(self, path): """Create a checkpoint.""" - model = self.get(path, content=True) - type = model['type'] - if type == 'notebook': - return self.checkpoint_manager.create_notebook_checkpoint( - model['content'], - path, - ) - elif type == 'file': - return self.checkpoint_manager.create_file_checkpoint( - model['content'], - model['format'], - path, - ) - - def list_checkpoints(self, path): - return self.checkpoint_manager.list_checkpoints(path) + return self.checkpoint_manager.create_checkpoint(self, path) def restore_checkpoint(self, checkpoint_id, path): """ Restore a checkpoint. """ - return self.save( - model=self.checkpoint_manager.get_checkpoint( - checkpoint_id, - path, - self.get(path, content=False)['type'] - ), - path=path, - ) + self.checkpoint_manager.restore_checkpoint(self, checkpoint_id, path) + + def list_checkpoints(self, path): + return self.checkpoint_manager.list_checkpoints(path) def delete_checkpoint(self, checkpoint_id, path): return self.checkpoint_manager.delete_checkpoint(checkpoint_id, path) diff --git a/IPython/html/services/contents/tests/test_contents_api.py b/IPython/html/services/contents/tests/test_contents_api.py index 3c0d33a..b19a8fa 100644 --- a/IPython/html/services/contents/tests/test_contents_api.py +++ b/IPython/html/services/contents/tests/test_contents_api.py @@ -614,3 +614,25 @@ class APITest(NotebookTestBase): with TemporaryDirectory() as td: with self.patch_cp_root(td): self.test_file_checkpoints() + + @contextmanager + def patch_cm_backend(self): + """ + Temporarily patch our ContentsManager to present a different backend. + """ + mgr = self.notebook.contents_manager + old_backend = mgr.backend + mgr.backend = "" + try: + yield + finally: + mgr.backend = old_backend + + def test_checkpoints_empty_backend(self): + with self.patch_cm_backend(): + self.test_checkpoints() + + with self.patch_cm_backend(): + self.test_file_checkpoints() + +