##// END OF EJS Templates
DEV: Separate FileCheckpointManager and GenericFileCheckpointManager....
Scott Sanderson -
Show More
@@ -0,0 +1,112 b''
1 """
2 Classes for managing Checkpoints.
3 """
4
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
7
8 from IPython.config.configurable import LoggingConfigurable
9
10
11 class CheckpointManager(LoggingConfigurable):
12 """
13 Base class for managing checkpoints for a ContentsManager.
14
15 Subclasses are required to implement:
16
17 create_checkpoint(self, contents_mgr, path)
18 restore_checkpoint(self, contents_mgr, checkpoint_id, path)
19 rename_checkpoint(self, checkpoint_id, old_path, new_path)
20 delete_checkpoint(self, checkpoint_id, path)
21 list_checkpoints(self, path)
22 """
23 def create_checkpoint(self, contents_mgr, path):
24 """Create a checkpoint."""
25 raise NotImplementedError("must be implemented in a subclass")
26
27 def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
28 """Restore a checkpoint"""
29 raise NotImplementedError("must be implemented in a subclass")
30
31 def rename_checkpoint(self, checkpoint_id, old_path, new_path):
32 """Rename a single checkpoint from old_path to new_path."""
33 raise NotImplementedError("must be implemented in a subclass")
34
35 def delete_checkpoint(self, checkpoint_id, path):
36 """delete a checkpoint for a file"""
37 raise NotImplementedError("must be implemented in a subclass")
38
39 def list_checkpoints(self, path):
40 """Return a list of checkpoints for a given file"""
41 raise NotImplementedError("must be implemented in a subclass")
42
43 def rename_all_checkpoints(self, old_path, new_path):
44 """Rename all checkpoints for old_path to new_path."""
45 for cp in self.list_checkpoints(old_path):
46 self.rename_checkpoint(cp['id'], old_path, new_path)
47
48 def delete_all_checkpoints(self, path):
49 """Delete all checkpoints for the given path."""
50 for checkpoint in self.list_checkpoints(path):
51 self.delete_checkpoint(checkpoint['id'], path)
52
53
54 class GenericCheckpointMixin(object):
55 """
56 Helper for creating CheckpointManagers that can be used with any
57 ContentsManager.
58
59 Provides an implementation of `create_checkpoint` and `restore_checkpoint`
60 in terms of the following operations:
61
62 create_file_checkpoint(self, content, format, path)
63 create_notebook_checkpoint(self, nb, path)
64 get_checkpoint(self, checkpoint_id, path, type)
65
66 **Any** valid CheckpointManager implementation should also be valid when
67 this mixin is applied.
68 """
69
70 def create_checkpoint(self, contents_mgr, path):
71 model = contents_mgr.get(path, content=True)
72 type = model['type']
73 if type == 'notebook':
74 return self.create_notebook_checkpoint(
75 model['content'],
76 path,
77 )
78 elif type == 'file':
79 return self.create_file_checkpoint(
80 model['content'],
81 model['format'],
82 path,
83 )
84
85 def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
86 """Restore a checkpoint."""
87 type = contents_mgr.get(path, content=False)['type']
88 model = self.get_checkpoint(checkpoint_id, path, type)
89 contents_mgr.save(model, path)
90
91 # Required Methods
92 def create_file_checkpoint(self, content, format, path):
93 """Create a checkpoint of the current state of a file
94
95 Returns a checkpoint model for the new checkpoint.
96 """
97 raise NotImplementedError("must be implemented in a subclass")
98
99 def create_notebook_checkpoint(self, nb, path):
100 """Create a checkpoint of the current state of a file
101
102 Returns a checkpoint model for the new checkpoint.
103 """
104 raise NotImplementedError("must be implemented in a subclass")
105
106 def get_checkpoint(self, checkpoint_id, path, type):
107 """Get the content of a checkpoint.
108
109 Returns an unvalidated model with the same structure as
110 the return value of ContentsManager.get
111 """
112 raise NotImplementedError("must be implemented in a subclass")
@@ -0,0 +1,198 b''
1 """
2 File-based CheckpointManagers.
3 """
4 import os
5 import shutil
6
7 from tornado.web import HTTPError
8
9 from .checkpoints import (
10 CheckpointManager,
11 GenericCheckpointMixin,
12 )
13 from .fileio import FileManagerMixin
14
15 from IPython.utils import tz
16 from IPython.utils.path import ensure_dir_exists
17 from IPython.utils.py3compat import getcwd
18 from IPython.utils.traitlets import Unicode
19
20
21 class FileCheckpointManager(FileManagerMixin, CheckpointManager):
22 """
23 A CheckpointManager that caches checkpoints for files in adjacent
24 directories.
25
26 Only works with FileContentsManager. Use GenericFileCheckpointManager if
27 you want file-based checkpoints with another ContentsManager.
28 """
29
30 checkpoint_dir = Unicode(
31 '.ipynb_checkpoints',
32 config=True,
33 help="""The directory name in which to keep file checkpoints
34
35 This is a path relative to the file's own directory.
36
37 By default, it is .ipynb_checkpoints
38 """,
39 )
40
41 root_dir = Unicode(config=True)
42
43 def _root_dir_default(self):
44 try:
45 return self.parent.root_dir
46 except AttributeError:
47 return getcwd()
48
49 # ContentsManager-dependent checkpoint API
50 def create_checkpoint(self, contents_mgr, path):
51 """Create a checkpoint."""
52 checkpoint_id = u'checkpoint'
53 src_path = contents_mgr._get_os_path(path)
54 dest_path = self.checkpoint_path(checkpoint_id, path)
55 self._copy(src_path, dest_path)
56 return self.checkpoint_model(checkpoint_id, dest_path)
57
58 def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
59 """Restore a checkpoint."""
60 src_path = self.checkpoint_path(checkpoint_id, path)
61 dest_path = contents_mgr._get_os_path(path)
62 self._copy(src_path, dest_path)
63
64 # ContentsManager-independent checkpoint API
65 def rename_checkpoint(self, checkpoint_id, old_path, new_path):
66 """Rename a checkpoint from old_path to new_path."""
67 old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
68 new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
69 if os.path.isfile(old_cp_path):
70 self.log.debug(
71 "Renaming checkpoint %s -> %s",
72 old_cp_path,
73 new_cp_path,
74 )
75 with self.perm_to_403():
76 shutil.move(old_cp_path, new_cp_path)
77
78 def delete_checkpoint(self, checkpoint_id, path):
79 """delete a file's checkpoint"""
80 path = path.strip('/')
81 cp_path = self.checkpoint_path(checkpoint_id, path)
82 if not os.path.isfile(cp_path):
83 self.no_such_checkpoint(path, checkpoint_id)
84
85 self.log.debug("unlinking %s", cp_path)
86 with self.perm_to_403():
87 os.unlink(cp_path)
88
89 def list_checkpoints(self, path):
90 """list the checkpoints for a given file
91
92 This contents manager currently only supports one checkpoint per file.
93 """
94 path = path.strip('/')
95 checkpoint_id = "checkpoint"
96 os_path = self.checkpoint_path(checkpoint_id, path)
97 if not os.path.isfile(os_path):
98 return []
99 else:
100 return [self.checkpoint_model(checkpoint_id, os_path)]
101
102 # Checkpoint-related utilities
103 def checkpoint_path(self, checkpoint_id, path):
104 """find the path to a checkpoint"""
105 path = path.strip('/')
106 parent, name = ('/' + path).rsplit('/', 1)
107 parent = parent.strip('/')
108 basename, ext = os.path.splitext(name)
109 filename = u"{name}-{checkpoint_id}{ext}".format(
110 name=basename,
111 checkpoint_id=checkpoint_id,
112 ext=ext,
113 )
114 os_path = self._get_os_path(path=parent)
115 cp_dir = os.path.join(os_path, self.checkpoint_dir)
116 with self.perm_to_403():
117 ensure_dir_exists(cp_dir)
118 cp_path = os.path.join(cp_dir, filename)
119 return cp_path
120
121 def checkpoint_model(self, checkpoint_id, os_path):
122 """construct the info dict for a given checkpoint"""
123 stats = os.stat(os_path)
124 last_modified = tz.utcfromtimestamp(stats.st_mtime)
125 info = dict(
126 id=checkpoint_id,
127 last_modified=last_modified,
128 )
129 return info
130
131 # Error Handling
132 def no_such_checkpoint(self, path, checkpoint_id):
133 raise HTTPError(
134 404,
135 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
136 )
137
138
139 class GenericFileCheckpointManager(GenericCheckpointMixin,
140 FileCheckpointManager):
141 """
142 Local filesystem CheckpointManager that works with any conforming
143 ContentsManager.
144 """
145 def create_file_checkpoint(self, content, format, path):
146 """Create a checkpoint from the current content of a notebook."""
147 path = path.strip('/')
148 # only the one checkpoint ID:
149 checkpoint_id = u"checkpoint"
150 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
151 self.log.debug("creating checkpoint for %s", path)
152 with self.perm_to_403():
153 self._save_file(os_checkpoint_path, content, format=format)
154
155 # return the checkpoint info
156 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
157
158 def create_notebook_checkpoint(self, nb, path):
159 """Create a checkpoint from the current content of a notebook."""
160 path = path.strip('/')
161 # only the one checkpoint ID:
162 checkpoint_id = u"checkpoint"
163 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
164 self.log.debug("creating checkpoint for %s", path)
165 with self.perm_to_403():
166 self._save_notebook(os_checkpoint_path, nb)
167
168 # return the checkpoint info
169 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
170
171 def get_checkpoint(self, checkpoint_id, path, type):
172 """Get the content of a checkpoint.
173
174 Returns a model suitable for passing to ContentsManager.save.
175 """
176 path = path.strip('/')
177 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
178 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
179 if not os.path.isfile(os_checkpoint_path):
180 self.no_such_checkpoint(path, checkpoint_id)
181
182 if type == 'notebook':
183 return {
184 'type': type,
185 'content': self._read_notebook(
186 os_checkpoint_path,
187 as_version=4,
188 ),
189 }
190 elif type == 'file':
191 content, format = self._read_file(os_checkpoint_path, format=None)
192 return {
193 'type': type,
194 'content': content,
195 'format': format,
196 }
197 else:
198 raise HTTPError(500, u'Unexpected type %s' % type)
@@ -0,0 +1,166 b''
1 """
2 Utilities for file-based Contents/Checkpoints managers.
3 """
4
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
7
8 import base64
9 from contextlib import contextmanager
10 import errno
11 import io
12 import os
13 import shutil
14
15 from tornado.web import HTTPError
16
17 from IPython.html.utils import (
18 to_api_path,
19 to_os_path,
20 )
21 from IPython import nbformat
22 from IPython.utils.io import atomic_writing
23 from IPython.utils.py3compat import str_to_unicode
24
25
26 class FileManagerMixin(object):
27 """
28 Mixin for ContentsAPI classes that interact with the filesystem.
29
30 Provides facilities for reading, writing, and copying both notebooks and
31 generic files.
32
33 Shared by FileContentsManager and FileCheckpointManager.
34
35 Note
36 ----
37 Classes using this mixin must provide the following attributes:
38
39 root_dir : unicode
40 A directory against against which API-style paths are to be resolved.
41
42 log : logging.Logger
43 """
44
45 @contextmanager
46 def open(self, os_path, *args, **kwargs):
47 """wrapper around io.open that turns permission errors into 403"""
48 with self.perm_to_403(os_path):
49 with io.open(os_path, *args, **kwargs) as f:
50 yield f
51
52 @contextmanager
53 def atomic_writing(self, os_path, *args, **kwargs):
54 """wrapper around atomic_writing that turns permission errors to 403"""
55 with self.perm_to_403(os_path):
56 with atomic_writing(os_path, *args, **kwargs) as f:
57 yield f
58
59 @contextmanager
60 def perm_to_403(self, os_path=''):
61 """context manager for turning permission errors into 403."""
62 try:
63 yield
64 except OSError as e:
65 if e.errno in {errno.EPERM, errno.EACCES}:
66 # make 403 error message without root prefix
67 # this may not work perfectly on unicode paths on Python 2,
68 # but nobody should be doing that anyway.
69 if not os_path:
70 os_path = str_to_unicode(e.filename or 'unknown file')
71 path = to_api_path(os_path, root=self.root_dir)
72 raise HTTPError(403, u'Permission denied: %s' % path)
73 else:
74 raise
75
76 def _copy(self, src, dest):
77 """copy src to dest
78
79 like shutil.copy2, but log errors in copystat
80 """
81 shutil.copyfile(src, dest)
82 try:
83 shutil.copystat(src, dest)
84 except OSError:
85 self.log.debug("copystat on %s failed", dest, exc_info=True)
86
87 def _get_os_path(self, path):
88 """Given an API path, return its file system path.
89
90 Parameters
91 ----------
92 path : string
93 The relative API path to the named file.
94
95 Returns
96 -------
97 path : string
98 Native, absolute OS path to for a file.
99 """
100 return to_os_path(path, self.root_dir)
101
102 def _read_notebook(self, os_path, as_version=4):
103 """Read a notebook from an os path."""
104 with self.open(os_path, 'r', encoding='utf-8') as f:
105 try:
106 return nbformat.read(f, as_version=as_version)
107 except Exception as e:
108 raise HTTPError(
109 400,
110 u"Unreadable Notebook: %s %r" % (os_path, e),
111 )
112
113 def _save_notebook(self, os_path, nb):
114 """Save a notebook to an os_path."""
115 with self.atomic_writing(os_path, encoding='utf-8') as f:
116 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
117
118 def _read_file(self, os_path, format):
119 """Read a non-notebook file.
120
121 os_path: The path to be read.
122 format:
123 If 'text', the contents will be decoded as UTF-8.
124 If 'base64', the raw bytes contents will be encoded as base64.
125 If not specified, try to decode as UTF-8, and fall back to base64
126 """
127 if not os.path.isfile(os_path):
128 raise HTTPError(400, "Cannot read non-file %s" % os_path)
129
130 with self.open(os_path, 'rb') as f:
131 bcontent = f.read()
132
133 if format is None or format == 'text':
134 # Try to interpret as unicode if format is unknown or if unicode
135 # was explicitly requested.
136 try:
137 return bcontent.decode('utf8'), 'text'
138 except UnicodeError:
139 if format == 'text':
140 raise HTTPError(
141 400,
142 "%s is not UTF-8 encoded" % os_path,
143 reason='bad format',
144 )
145 return base64.encodestring(bcontent).decode('ascii'), 'base64'
146
147 def _save_file(self, os_path, content, format):
148 """Save content of a generic file."""
149 if format not in {'text', 'base64'}:
150 raise HTTPError(
151 400,
152 "Must specify format of file contents as 'text' or 'base64'",
153 )
154 try:
155 if format == 'text':
156 bcontent = content.encode('utf8')
157 else:
158 b64_bytes = content.encode('ascii')
159 bcontent = base64.decodestring(b64_bytes)
160 except Exception as e:
161 raise HTTPError(
162 400, u'Encoding error saving %s: %s' % (os_path, e)
163 )
164
165 with self.atomic_writing(os_path, text=False) as f:
166 f.write(bcontent)
@@ -3,9 +3,7 b''
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6
7 from contextlib import contextmanager
8 import errno
9 import io
7 import io
10 import os
8 import os
11 import shutil
9 import shutil
@@ -13,25 +11,23 b' import mimetypes'
13
11
14 from tornado import web
12 from tornado import web
15
13
16 from .manager import (
14 from .filecheckpoints import FileCheckpointManager
17 CheckpointManager,
15 from .fileio import FileManagerMixin
18 ContentsManager,
16 from .manager import ContentsManager
19 )
17
20 from IPython import nbformat
18 from IPython import nbformat
21 from IPython.utils.io import atomic_writing
22 from IPython.utils.importstring import import_item
19 from IPython.utils.importstring import import_item
23 from IPython.utils.path import ensure_dir_exists
24 from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
20 from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
25 from IPython.utils.py3compat import getcwd, string_types, str_to_unicode
21 from IPython.utils.py3compat import getcwd, string_types
26 from IPython.utils import tz
22 from IPython.utils import tz
27 from IPython.html.utils import (
23 from IPython.html.utils import (
28 is_hidden,
24 is_hidden,
29 to_api_path,
25 to_api_path,
30 to_os_path,
31 )
26 )
32
27
33 _script_exporter = None
28 _script_exporter = None
34
29
30
35 def _post_save_script(model, os_path, contents_manager, **kwargs):
31 def _post_save_script(model, os_path, contents_manager, **kwargs):
36 """convert notebooks to Python script after save with nbconvert
32 """convert notebooks to Python script after save with nbconvert
37
33
@@ -56,346 +52,6 b' def _post_save_script(model, os_path, contents_manager, **kwargs):'
56 f.write(script)
52 f.write(script)
57
53
58
54
59 class FileManagerMixin(object):
60 """
61 Mixin for ContentsAPI classes that interact with the filesystem.
62
63 Provides facilities for reading, writing, and copying both notebooks and
64 generic files.
65
66 Shared by FileContentsManager and FileCheckpointManager.
67
68 Note
69 ----
70 Classes using this mixin must provide the following attributes:
71
72 root_dir : unicode
73 A directory against against which API-style paths are to be resolved.
74
75 log : logging.Logger
76 """
77
78 @contextmanager
79 def open(self, os_path, *args, **kwargs):
80 """wrapper around io.open that turns permission errors into 403"""
81 with self.perm_to_403(os_path):
82 with io.open(os_path, *args, **kwargs) as f:
83 yield f
84
85 @contextmanager
86 def atomic_writing(self, os_path, *args, **kwargs):
87 """wrapper around atomic_writing that turns permission errors into 403"""
88 with self.perm_to_403(os_path):
89 with atomic_writing(os_path, *args, **kwargs) as f:
90 yield f
91
92 @contextmanager
93 def perm_to_403(self, os_path=''):
94 """context manager for turning permission errors into 403."""
95 try:
96 yield
97 except OSError as e:
98 if e.errno in {errno.EPERM, errno.EACCES}:
99 # make 403 error message without root prefix
100 # this may not work perfectly on unicode paths on Python 2,
101 # but nobody should be doing that anyway.
102 if not os_path:
103 os_path = str_to_unicode(e.filename or 'unknown file')
104 path = to_api_path(os_path, root=self.root_dir)
105 raise web.HTTPError(403, u'Permission denied: %s' % path)
106 else:
107 raise
108
109 def _copy(self, src, dest):
110 """copy src to dest
111
112 like shutil.copy2, but log errors in copystat
113 """
114 shutil.copyfile(src, dest)
115 try:
116 shutil.copystat(src, dest)
117 except OSError:
118 self.log.debug("copystat on %s failed", dest, exc_info=True)
119
120 def _get_os_path(self, path):
121 """Given an API path, return its file system path.
122
123 Parameters
124 ----------
125 path : string
126 The relative API path to the named file.
127
128 Returns
129 -------
130 path : string
131 Native, absolute OS path to for a file.
132 """
133 return to_os_path(path, self.root_dir)
134
135 def _read_notebook(self, os_path, as_version=4):
136 """Read a notebook from an os path."""
137 with self.open(os_path, 'r', encoding='utf-8') as f:
138 try:
139 return nbformat.read(f, as_version=as_version)
140 except Exception as e:
141 raise web.HTTPError(
142 400,
143 u"Unreadable Notebook: %s %r" % (os_path, e),
144 )
145
146 def _save_notebook(self, os_path, nb):
147 """Save a notebook to an os_path."""
148 with self.atomic_writing(os_path, encoding='utf-8') as f:
149 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
150
151 def _read_file(self, os_path, format):
152 """Read a non-notebook file.
153
154 os_path: The path to be read.
155 format:
156 If 'text', the contents will be decoded as UTF-8.
157 If 'base64', the raw bytes contents will be encoded as base64.
158 If not specified, try to decode as UTF-8, and fall back to base64
159 """
160 if not os.path.isfile(os_path):
161 raise web.HTTPError(400, "Cannot read non-file %s" % os_path)
162
163 with self.open(os_path, 'rb') as f:
164 bcontent = f.read()
165
166 if format is None or format == 'text':
167 # Try to interpret as unicode if format is unknown or if unicode
168 # was explicitly requested.
169 try:
170 return bcontent.decode('utf8'), 'text'
171 except UnicodeError as e:
172 if format == 'text':
173 raise web.HTTPError(
174 400,
175 "%s is not UTF-8 encoded" % os_path,
176 reason='bad format',
177 )
178 return base64.encodestring(bcontent).decode('ascii'), 'base64'
179
180 def _save_file(self, os_path, content, format):
181 """Save content of a generic file."""
182 if format not in {'text', 'base64'}:
183 raise web.HTTPError(
184 400,
185 "Must specify format of file contents as 'text' or 'base64'",
186 )
187 try:
188 if format == 'text':
189 bcontent = content.encode('utf8')
190 else:
191 b64_bytes = content.encode('ascii')
192 bcontent = base64.decodestring(b64_bytes)
193 except Exception as e:
194 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
195
196 with self.atomic_writing(os_path, text=False) as f:
197 f.write(bcontent)
198
199
200 class FileCheckpointManager(FileManagerMixin, CheckpointManager):
201 """
202 A CheckpointManager that caches checkpoints for files in adjacent
203 directories.
204 """
205
206 checkpoint_dir = Unicode(
207 '.ipynb_checkpoints',
208 config=True,
209 help="""The directory name in which to keep file checkpoints
210
211 This is a path relative to the file's own directory.
212
213 By default, it is .ipynb_checkpoints
214 """,
215 )
216
217 root_dir = Unicode(config=True)
218
219 def _root_dir_default(self):
220 try:
221 return self.parent.root_dir
222 except AttributeError:
223 return getcwd()
224
225 # ContentsManager-dependent checkpoint API
226 def create_checkpoint(self, contents_mgr, path):
227 """
228 Create a checkpoint.
229
230 If contents_mgr is backed by the local filesystem, just copy the
231 appropriate file to the checkpoint directory. Otherwise, ask the
232 ContentsManager for a model and write it ourselves.
233 """
234 if contents_mgr.backend == 'local_file':
235 # We know that the file is in the local filesystem, so just copy
236 # from the base location to our location.
237 checkpoint_id = u'checkpoint'
238 src_path = contents_mgr._get_os_path(path)
239 dest_path = self.checkpoint_path(checkpoint_id, path)
240 self._copy(src_path, dest_path)
241 return self.checkpoint_model(checkpoint_id, dest_path)
242 else:
243 return super(FileCheckpointManager, self).create_checkpoint(
244 contents_mgr, path,
245 )
246
247 def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
248 """
249 Restore a checkpoint.
250
251 If contents_mgr is backed by the local filesystem, just copy the
252 appropriate file from the checkpoint directory. Otherwise, load the
253 model and pass it to ContentsManager.save.
254 """
255 if contents_mgr.backend == 'local_file':
256 # We know that the file is in the local filesystem, so just copy
257 # from our base location to the location expected by content
258 src_path = self.checkpoint_path(checkpoint_id, path)
259 dest_path = contents_mgr._get_os_path(path)
260 self._copy(src_path, dest_path)
261 else:
262 super(FileCheckpointManager, self).restore_checkpoint(
263 contents_mgr, checkpoint_id, path
264 )
265
266 # ContentsManager-independent checkpoint API
267 def rename_checkpoint(self, checkpoint_id, old_path, new_path):
268 """Rename a checkpoint from old_path to new_path."""
269 old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
270 new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
271 if os.path.isfile(old_cp_path):
272 self.log.debug(
273 "Renaming checkpoint %s -> %s",
274 old_cp_path,
275 new_cp_path,
276 )
277 with self.perm_to_403():
278 shutil.move(old_cp_path, new_cp_path)
279
280 def delete_checkpoint(self, checkpoint_id, path):
281 """delete a file's checkpoint"""
282 path = path.strip('/')
283 cp_path = self.checkpoint_path(checkpoint_id, path)
284 if not os.path.isfile(cp_path):
285 self.no_such_checkpoint(path, checkpoint_id)
286
287 self.log.debug("unlinking %s", cp_path)
288 with self.perm_to_403():
289 os.unlink(cp_path)
290
291 def list_checkpoints(self, path):
292 """list the checkpoints for a given file
293
294 This contents manager currently only supports one checkpoint per file.
295 """
296 path = path.strip('/')
297 checkpoint_id = "checkpoint"
298 os_path = self.checkpoint_path(checkpoint_id, path)
299 if not os.path.isfile(os_path):
300 return []
301 else:
302 return [self.checkpoint_model(checkpoint_id, os_path)]
303
304 # Checkpoint-related utilities
305 def checkpoint_path(self, checkpoint_id, path):
306 """find the path to a checkpoint"""
307 path = path.strip('/')
308 parent, name = ('/' + path).rsplit('/', 1)
309 parent = parent.strip('/')
310 basename, ext = os.path.splitext(name)
311 filename = u"{name}-{checkpoint_id}{ext}".format(
312 name=basename,
313 checkpoint_id=checkpoint_id,
314 ext=ext,
315 )
316 os_path = self._get_os_path(path=parent)
317 cp_dir = os.path.join(os_path, self.checkpoint_dir)
318 with self.perm_to_403():
319 ensure_dir_exists(cp_dir)
320 cp_path = os.path.join(cp_dir, filename)
321 return cp_path
322
323 def checkpoint_model(self, checkpoint_id, os_path):
324 """construct the info dict for a given checkpoint"""
325 stats = os.stat(os_path)
326 last_modified = tz.utcfromtimestamp(stats.st_mtime)
327 info = dict(
328 id=checkpoint_id,
329 last_modified=last_modified,
330 )
331 return info
332
333 def create_file_checkpoint(self, content, format, path):
334 """Create a checkpoint from the current content of a notebook."""
335 path = path.strip('/')
336 # only the one checkpoint ID:
337 checkpoint_id = u"checkpoint"
338 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
339 self.log.debug("creating checkpoint for %s", path)
340 with self.perm_to_403():
341 self._save_file(os_checkpoint_path, content, format=format)
342
343 # return the checkpoint info
344 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
345
346 def create_notebook_checkpoint(self, nb, path):
347 """Create a checkpoint from the current content of a notebook."""
348 path = path.strip('/')
349 # only the one checkpoint ID:
350 checkpoint_id = u"checkpoint"
351 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
352 self.log.debug("creating checkpoint for %s", path)
353 with self.perm_to_403():
354 self._save_notebook(os_checkpoint_path, nb)
355
356 # return the checkpoint info
357 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
358
359 def get_checkpoint(self, checkpoint_id, path, type):
360 """Get the content of a checkpoint.
361
362 Returns a model suitable for passing to ContentsManager.save.
363 """
364 path = path.strip('/')
365 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
366 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
367 if not os.path.isfile(os_checkpoint_path):
368 self.no_such_checkpoint(path, checkpoint_id)
369
370 if type == 'notebook':
371 return {
372 'type': type,
373 'content': self._read_notebook(
374 os_checkpoint_path,
375 as_version=4,
376 ),
377 }
378 elif type == 'file':
379 content, format = self._read_file(os_checkpoint_path, format=None)
380 return {
381 'type': type,
382 'content': content,
383 'format': format,
384 }
385 else:
386 raise web.HTTPError(
387 500,
388 u'Unexpected type %s' % type
389 )
390
391 # Error Handling
392 def no_such_checkpoint(self, path, checkpoint_id):
393 raise web.HTTPError(
394 404,
395 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
396 )
397
398
399 class FileContentsManager(FileManagerMixin, ContentsManager):
55 class FileContentsManager(FileManagerMixin, ContentsManager):
400
56
401 root_dir = Unicode(config=True)
57 root_dir = Unicode(config=True)
@@ -468,9 +124,6 b' class FileContentsManager(FileManagerMixin, ContentsManager):'
468 def _checkpoint_manager_class_default(self):
124 def _checkpoint_manager_class_default(self):
469 return FileCheckpointManager
125 return FileCheckpointManager
470
126
471 def _backend_default(self):
472 return 'local_file'
473
474 def is_hidden(self, path):
127 def is_hidden(self, path):
475 """Does the API style path correspond to a hidden directory or file?
128 """Does the API style path correspond to a hidden directory or file?
476
129
@@ -11,6 +11,7 b' import re'
11
11
12 from tornado.web import HTTPError
12 from tornado.web import HTTPError
13
13
14 from .checkpoints import CheckpointManager
14 from IPython.config.configurable import LoggingConfigurable
15 from IPython.config.configurable import LoggingConfigurable
15 from IPython.nbformat import sign, validate, ValidationError
16 from IPython.nbformat import sign, validate, ValidationError
16 from IPython.nbformat.v4 import new_notebook
17 from IPython.nbformat.v4 import new_notebook
@@ -29,77 +30,6 b' from IPython.utils.py3compat import string_types'
29 copy_pat = re.compile(r'\-Copy\d*\.')
30 copy_pat = re.compile(r'\-Copy\d*\.')
30
31
31
32
32 class CheckpointManager(LoggingConfigurable):
33 """
34 Base class for managing checkpoints for a ContentsManager.
35 """
36
37 def create_checkpoint(self, contents_mgr, path):
38 model = contents_mgr.get(path, content=True)
39 type = model['type']
40 if type == 'notebook':
41 return self.create_notebook_checkpoint(
42 model['content'],
43 path,
44 )
45 elif type == 'file':
46 return self.create_file_checkpoint(
47 model['content'],
48 model['format'],
49 path,
50 )
51
52 def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
53 """Restore a checkpoint."""
54 type = contents_mgr.get(path, content=False)['type']
55 model = self.get_checkpoint(checkpoint_id, path, type)
56 contents_mgr.save(model, path)
57
58 def create_file_checkpoint(self, content, format, path):
59 """Create a checkpoint of the current state of a file
60
61 Returns a checkpoint model for the new checkpoint.
62 """
63 raise NotImplementedError("must be implemented in a subclass")
64
65 def create_notebook_checkpoint(self, nb, path):
66 """Create a checkpoint of the current state of a file
67
68 Returns a checkpoint model for the new checkpoint.
69 """
70 raise NotImplementedError("must be implemented in a subclass")
71
72 def get_checkpoint(self, checkpoint_id, path, type):
73 """Get the content of a checkpoint.
74
75 Returns an unvalidated model with the same structure as
76 the return value of ContentsManager.get
77 """
78 raise NotImplementedError("must be implemented in a subclass")
79
80 def rename_checkpoint(self, checkpoint_id, old_path, new_path):
81 """Rename a single checkpoint from old_path to new_path."""
82 raise NotImplementedError("must be implemented in a subclass")
83
84 def delete_checkpoint(self, checkpoint_id, path):
85 """delete a checkpoint for a file"""
86 raise NotImplementedError("must be implemented in a subclass")
87
88 def list_checkpoints(self, path):
89 """Return a list of checkpoints for a given file"""
90 raise NotImplementedError("must be implemented in a subclass")
91
92 def rename_all_checkpoints(self, old_path, new_path):
93 """Rename all checkpoints for old_path to new_path."""
94 for cp in self.list_checkpoints(old_path):
95 self.rename_checkpoint(cp['id'], old_path, new_path)
96
97 def delete_all_checkpoints(self, path):
98 """Delete all checkpoints for the given path."""
99 for checkpoint in self.list_checkpoints(path):
100 self.delete_checkpoint(checkpoint['id'], path)
101
102
103 class ContentsManager(LoggingConfigurable):
33 class ContentsManager(LoggingConfigurable):
104 """Base class for serving files and directories.
34 """Base class for serving files and directories.
105
35
@@ -180,7 +110,6 b' class ContentsManager(LoggingConfigurable):'
180 checkpoint_manager_class = Type(CheckpointManager, config=True)
110 checkpoint_manager_class = Type(CheckpointManager, config=True)
181 checkpoint_manager = Instance(CheckpointManager, config=True)
111 checkpoint_manager = Instance(CheckpointManager, config=True)
182 checkpoint_manager_kwargs = Dict(allow_none=False, config=True)
112 checkpoint_manager_kwargs = Dict(allow_none=False, config=True)
183 backend = Unicode(default_value="")
184
113
185 def _checkpoint_manager_default(self):
114 def _checkpoint_manager_default(self):
186 return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs)
115 return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs)
@@ -13,6 +13,9 b' pjoin = os.path.join'
13
13
14 import requests
14 import requests
15
15
16 from ..filecheckpoints import GenericFileCheckpointManager
17
18 from IPython.config import Config
16 from IPython.html.utils import url_path_join, url_escape, to_os_path
19 from IPython.html.utils import url_path_join, url_escape, to_os_path
17 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
20 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
18 from IPython.nbformat import read, write, from_dict
21 from IPython.nbformat import read, write, from_dict
@@ -615,24 +618,12 b' class APITest(NotebookTestBase):'
615 with self.patch_cp_root(td):
618 with self.patch_cp_root(td):
616 self.test_file_checkpoints()
619 self.test_file_checkpoints()
617
620
618 @contextmanager
619 def patch_cm_backend(self):
620 """
621 Temporarily patch our ContentsManager to present a different backend.
622 """
623 mgr = self.notebook.contents_manager
624 old_backend = mgr.backend
625 mgr.backend = ""
626 try:
627 yield
628 finally:
629 mgr.backend = old_backend
630
631 def test_checkpoints_empty_backend(self):
632 with self.patch_cm_backend():
633 self.test_checkpoints()
634
635 with self.patch_cm_backend():
636 self.test_file_checkpoints()
637
621
622 class GenericFileCheckpointsAPITest(APITest):
623 """
624 Run the tests from APITest with GenericFileCheckpointManager.
625 """
638
626
627 config = Config()
628 config.FileContentsManager.checkpoint_manager_class = \
629 GenericFileCheckpointManager
General Comments 0
You need to be logged in to leave comments. Login now