##// END OF EJS Templates
url encode/decode tests added to nbmanager
Zachary Sailer -
Show More
@@ -1,349 +1,349 b''
1 1 """A notebook manager that uses the local file system for storage.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import datetime
20 20 import io
21 21 import os
22 22 import glob
23 23 import shutil
24 24
25 25 from unicodedata import normalize
26 26
27 27 from tornado import web
28 28
29 29 from .nbmanager import NotebookManager
30 30 from IPython.nbformat import current
31 31 from IPython.utils.traitlets import Unicode, Dict, Bool, TraitError
32 32 from IPython.utils import tz
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Classes
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class FileNotebookManager(NotebookManager):
39 39
40 40 save_script = Bool(False, config=True,
41 41 help="""Automatically create a Python script when saving the notebook.
42 42
43 43 For easier use of import, %run and %load across notebooks, a
44 44 <notebook-name>.py script will be created next to any
45 45 <notebook-name>.ipynb on each save. This can also be set with the
46 46 short `--script` flag.
47 47 """
48 48 )
49 49
50 50 checkpoint_dir = Unicode(config=True,
51 51 help="""The location in which to keep notebook checkpoints
52 52
53 53 By default, it is notebook-dir/.ipynb_checkpoints
54 54 """
55 55 )
56 56 def _checkpoint_dir_default(self):
57 57 return os.path.join(self.notebook_dir, '.ipynb_checkpoints')
58 58
59 59 def _checkpoint_dir_changed(self, name, old, new):
60 60 """do a bit of validation of the checkpoint dir"""
61 61 if not os.path.isabs(new):
62 62 # If we receive a non-absolute path, make it absolute.
63 63 abs_new = os.path.abspath(new)
64 64 self.checkpoint_dir = abs_new
65 65 return
66 66 if os.path.exists(new) and not os.path.isdir(new):
67 67 raise TraitError("checkpoint dir %r is not a directory" % new)
68 68 if not os.path.exists(new):
69 69 self.log.info("Creating checkpoint dir %s", new)
70 70 try:
71 71 os.mkdir(new)
72 72 except:
73 73 raise TraitError("Couldn't create checkpoint dir %r" % new)
74 74
75 75 filename_ext = Unicode(u'.ipynb')
76 76
77 77
78 78 def get_notebook_names(self, path):
79 79 """List all notebook names in the notebook dir."""
80 80 names = glob.glob(os.path.join(self.notebook_dir, path,
81 81 '*' + self.filename_ext))
82 82 names = [os.path.basename(name)
83 83 for name in names]
84 84 return names
85 85
86 86 def list_notebooks(self, path):
87 87 """List all notebooks in the notebook dir."""
88 88 notebook_names = self.get_notebook_names(path)
89 notebook_mapping = []
89 notebooks = []
90 90 for name in notebook_names:
91 91 model = self.notebook_model(name, path, content=False)
92 notebook_mapping.append(model)
93 return notebook_mapping
92 notebooks.append(model)
93 return notebooks
94 94
95 95 def change_notebook(self, data, notebook_name, notebook_path=None):
96 96 """Changes notebook"""
97 97 changes = data.keys()
98 98 response = 200
99 99 for change in changes:
100 100 full_path = self.get_path(notebook_name, notebook_path)
101 101 if change == "name":
102 102 new_path = self.get_path(data['name'], notebook_path)
103 103 if not os.path.isfile(new_path):
104 104 os.rename(full_path,
105 105 self.get_path(data['name'], notebook_path))
106 106 notebook_name = data['name']
107 107 else:
108 108 response = 409
109 109 if change == "path":
110 110 new_path = self.get_path(data['name'], data['path'])
111 111 stutil.move(full_path, new_path)
112 112 notebook_path = data['path']
113 113 if change == "content":
114 114 self.save_notebook(data, notebook_name, notebook_path)
115 115 model = self.notebook_model(notebook_name, notebook_path)
116 116 return model, response
117 117
118 118 def notebook_exists(self, notebook_path):
119 119 """Does a notebook exist?"""
120 120 return os.path.isfile(notebook_path)
121 121
122 122 def get_path(self, notebook_name, notebook_path=None):
123 123 """Return a full path to a notebook given its notebook_name."""
124 124 return self.get_path_by_name(notebook_name, notebook_path)
125 125
126 126 def get_path_by_name(self, name, notebook_path=None):
127 127 """Return a full path to a notebook given its name."""
128 128 filename = name #+ self.filename_ext
129 129 if notebook_path == None:
130 130 path = os.path.join(self.notebook_dir, filename)
131 131 else:
132 132 path = os.path.join(self.notebook_dir, notebook_path, filename)
133 133 return path
134 134
135 135 def read_notebook_object_from_path(self, path):
136 136 """read a notebook object from a path"""
137 137 info = os.stat(path)
138 138 last_modified = tz.utcfromtimestamp(info.st_mtime)
139 139 with open(path,'r') as f:
140 140 s = f.read()
141 141 try:
142 142 # v1 and v2 and json in the .ipynb files.
143 143 nb = current.reads(s, u'json')
144 144 except ValueError as e:
145 145 msg = u"Unreadable Notebook: %s" % e
146 146 raise web.HTTPError(400, msg, reason=msg)
147 147 return last_modified, nb
148 148
149 149 def read_notebook_object(self, notebook_name, notebook_path=None):
150 150 """Get the Notebook representation of a notebook by notebook_name."""
151 151 path = self.get_path(notebook_name, notebook_path)
152 152 if not os.path.isfile(path):
153 153 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_name)
154 154 last_modified, nb = self.read_notebook_object_from_path(path)
155 155 # Always use the filename as the notebook name.
156 156 # Eventually we will get rid of the notebook name in the metadata
157 157 # but for now, that name is just an empty string. Until the notebooks
158 158 # web service knows about names in URLs we still pass the name
159 159 # back to the web app using the metadata though.
160 160 nb.metadata.name = os.path.splitext(os.path.basename(path))[0]
161 161 return last_modified, nb
162 162
163 163 def write_notebook_object(self, nb, notebook_name=None, notebook_path=None, new_name= None):
164 164 """Save an existing notebook object by notebook_name."""
165 165 if new_name == None:
166 166 try:
167 167 new_name = normalize('NFC', nb.metadata.name)
168 168 except AttributeError:
169 169 raise web.HTTPError(400, u'Missing notebook name')
170 170
171 171 new_path = notebook_path
172 172 old_name = notebook_name
173 173 old_checkpoints = self.list_checkpoints(old_name)
174 174
175 175 path = self.get_path_by_name(new_name, new_path)
176 176
177 177 # Right before we save the notebook, we write an empty string as the
178 178 # notebook name in the metadata. This is to prepare for removing
179 179 # this attribute entirely post 1.0. The web app still uses the metadata
180 180 # name for now.
181 181 nb.metadata.name = u''
182 182
183 183 try:
184 184 self.log.debug("Autosaving notebook %s", path)
185 185 with open(path,'w') as f:
186 186 current.write(nb, f, u'json')
187 187 except Exception as e:
188 188 raise web.HTTPError(400, u'Unexpected error while autosaving notebook: %s' % e)
189 189
190 190 # save .py script as well
191 191 if self.save_script:
192 192 pypath = os.path.splitext(path)[0] + '.py'
193 193 self.log.debug("Writing script %s", pypath)
194 194 try:
195 195 with io.open(pypath,'w', encoding='utf-8') as f:
196 196 current.write(nb, f, u'py')
197 197 except Exception as e:
198 198 raise web.HTTPError(400, u'Unexpected error while saving notebook as script: %s' % e)
199 199
200 200 if old_name != None:
201 201 # remove old files if the name changed
202 202 if old_name != new_name:
203 203 # remove renamed original, if it exists
204 204 old_path = self.get_path_by_name(old_name, notebook_path)
205 205 if os.path.isfile(old_path):
206 206 self.log.debug("unlinking notebook %s", old_path)
207 207 os.unlink(old_path)
208 208
209 209 # cleanup old script, if it exists
210 210 if self.save_script:
211 211 old_pypath = os.path.splitext(old_path)[0] + '.py'
212 212 if os.path.isfile(old_pypath):
213 213 self.log.debug("unlinking script %s", old_pypath)
214 214 os.unlink(old_pypath)
215 215
216 216 # rename checkpoints to follow file
217 217 for cp in old_checkpoints:
218 218 checkpoint_id = cp['checkpoint_id']
219 219 old_cp_path = self.get_checkpoint_path_by_name(old_name, checkpoint_id)
220 220 new_cp_path = self.get_checkpoint_path_by_name(new_name, checkpoint_id)
221 221 if os.path.isfile(old_cp_path):
222 222 self.log.debug("renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
223 223 os.rename(old_cp_path, new_cp_path)
224 224
225 225 return new_name
226 226
227 227 def delete_notebook(self, notebook_name, notebook_path):
228 228 """Delete notebook by notebook_name."""
229 229 nb_path = self.get_path(notebook_name, notebook_path)
230 230 if not os.path.isfile(nb_path):
231 231 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_name)
232 232
233 233 # clear checkpoints
234 234 for checkpoint in self.list_checkpoints(notebook_name):
235 235 checkpoint_id = checkpoint['checkpoint_id']
236 236 path = self.get_checkpoint_path(notebook_name, checkpoint_id)
237 237 self.log.debug(path)
238 238 if os.path.isfile(path):
239 239 self.log.debug("unlinking checkpoint %s", path)
240 240 os.unlink(path)
241 241
242 242 self.log.debug("unlinking notebook %s", nb_path)
243 243 os.unlink(nb_path)
244 244
245 245 def increment_filename(self, basename, notebook_path=None):
246 246 """Return a non-used filename of the form basename<int>.
247 247
248 248 This searches through the filenames (basename0, basename1, ...)
249 249 until is find one that is not already being used. It is used to
250 250 create Untitled and Copy names that are unique.
251 251 """
252 252 i = 0
253 253 while True:
254 254 name = u'%s%i.ipynb' % (basename,i)
255 255 path = self.get_path_by_name(name, notebook_path)
256 256 if not os.path.isfile(path):
257 257 break
258 258 else:
259 259 i = i+1
260 260 return name
261 261
262 262 # Checkpoint-related utilities
263 263
264 264 def get_checkpoint_path_by_name(self, name, checkpoint_id, notebook_path=None):
265 265 """Return a full path to a notebook checkpoint, given its name and checkpoint id."""
266 266 filename = u"{name}-{checkpoint_id}{ext}".format(
267 267 name=name,
268 268 checkpoint_id=checkpoint_id,
269 269 ext=self.filename_ext,
270 270 )
271 271 if notebook_path ==None:
272 272 path = os.path.join(self.checkpoint_dir, filename)
273 273 else:
274 274 path = os.path.join(notebook_path, self.checkpoint_dir, filename)
275 275 return path
276 276
277 277 def get_checkpoint_path(self, notebook_name, checkpoint_id, notebook_path=None):
278 278 """find the path to a checkpoint"""
279 279 name = notebook_name
280 280 return self.get_checkpoint_path_by_name(name, checkpoint_id, notebook_path)
281 281
282 282 def get_checkpoint_info(self, notebook_name, checkpoint_id, notebook_path=None):
283 283 """construct the info dict for a given checkpoint"""
284 284 path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
285 285 stats = os.stat(path)
286 286 last_modified = tz.utcfromtimestamp(stats.st_mtime)
287 287 info = dict(
288 288 checkpoint_id = checkpoint_id,
289 289 last_modified = last_modified,
290 290 )
291 291
292 292 return info
293 293
294 294 # public checkpoint API
295 295
296 296 def create_checkpoint(self, notebook_name, notebook_path=None):
297 297 """Create a checkpoint from the current state of a notebook"""
298 298 nb_path = self.get_path(notebook_name, notebook_path)
299 299 # only the one checkpoint ID:
300 300 checkpoint_id = u"checkpoint"
301 301 cp_path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
302 302 self.log.debug("creating checkpoint for notebook %s", notebook_name)
303 303 if not os.path.exists(self.checkpoint_dir):
304 304 os.mkdir(self.checkpoint_dir)
305 305 shutil.copy2(nb_path, cp_path)
306 306
307 307 # return the checkpoint info
308 308 return self.get_checkpoint_info(notebook_name, checkpoint_id, notebook_path)
309 309
310 310 def list_checkpoints(self, notebook_name, notebook_path=None):
311 311 """list the checkpoints for a given notebook
312 312
313 313 This notebook manager currently only supports one checkpoint per notebook.
314 314 """
315 315 checkpoint_id = "checkpoint"
316 316 path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
317 317 if not os.path.exists(path):
318 318 return []
319 319 else:
320 320 return [self.get_checkpoint_info(notebook_name, checkpoint_id, notebook_path)]
321 321
322 322
323 323 def restore_checkpoint(self, notebook_name, checkpoint_id, notebook_path=None):
324 324 """restore a notebook to a checkpointed state"""
325 325 self.log.info("restoring Notebook %s from checkpoint %s", notebook_name, checkpoint_id)
326 326 nb_path = self.get_path(notebook_name, notebook_path)
327 327 cp_path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
328 328 if not os.path.isfile(cp_path):
329 329 self.log.debug("checkpoint file does not exist: %s", cp_path)
330 330 raise web.HTTPError(404,
331 331 u'Notebook checkpoint does not exist: %s-%s' % (notebook_name, checkpoint_id)
332 332 )
333 333 # ensure notebook is readable (never restore from an unreadable notebook)
334 334 last_modified, nb = self.read_notebook_object_from_path(cp_path)
335 335 shutil.copy2(cp_path, nb_path)
336 336 self.log.debug("copying %s -> %s", cp_path, nb_path)
337 337
338 338 def delete_checkpoint(self, notebook_name, checkpoint_id, notebook_path=None):
339 339 """delete a notebook's checkpoint"""
340 340 path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
341 341 if not os.path.isfile(path):
342 342 raise web.HTTPError(404,
343 343 u'Notebook checkpoint does not exist: %s-%s' % (notebook_name, checkpoint_id)
344 344 )
345 345 self.log.debug("unlinking %s", path)
346 346 os.unlink(path)
347 347
348 348 def info_string(self):
349 349 return "Serving notebooks from local directory: %s" % self.notebook_dir
@@ -1,266 +1,263 b''
1 1 """A base class notebook manager.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import os
20 20 import uuid
21 21
22 22 from tornado import web
23 23 from urllib import quote, unquote
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.nbformat import current
27 27 from IPython.utils.traitlets import List, Dict, Unicode, TraitError
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Classes
31 31 #-----------------------------------------------------------------------------
32 32
33 33 class NotebookManager(LoggingConfigurable):
34 34
35 35 # Todo:
36 36 # The notebook_dir attribute is used to mean a couple of different things:
37 37 # 1. Where the notebooks are stored if FileNotebookManager is used.
38 38 # 2. The cwd of the kernel for a project.
39 39 # Right now we use this attribute in a number of different places and
40 40 # we are going to have to disentangle all of this.
41 41 notebook_dir = Unicode(os.getcwdu(), config=True, help="""
42 42 The directory to use for notebooks.
43 43 """)
44 44
45 45 def named_notebook_path(self, notebook_path):
46 46 """Given a notebook_path name, returns a (name, path) tuple, where
47 47 name is a .ipynb file, and path is the directory for the file, which
48 48 *always* starts *and* ends with a '/' character.
49 49
50 50 Parameters
51 51 ----------
52 52 notebook_path : string
53 53 A path that may be a .ipynb name or a directory
54 54
55 55 Returns
56 56 -------
57 57 name : string or None
58 58 the filename of the notebook, or None if not a .ipynb extension
59 59 path : string
60 60 the path to the directory which contains the notebook
61 61 """
62 62 names = notebook_path.split('/')
63 63 names = [n for n in names if n != ''] # remove duplicate splits
64 64
65 65 names = [''] + names
66 66
67 67 if names and names[-1].endswith(".ipynb"):
68 68 name = names[-1]
69 69 path = "/".join(names[:-1]) + '/'
70 70 else:
71 71 name = None
72 72 path = "/".join(names) + '/'
73 73 return name, path
74 74
75 75 def url_encode(self, path):
76 parts = path.split('/')
76 parts = os.path.split(path)
77 #parts = path.split('/')
77 78 return os.path.join(*[quote(p) for p in parts])
78 79
79 80 def url_decode(self, path):
80 parts = path.split('/')
81 parts = os.path.split(path)
82 #parts = path.split('/')
81 83 return os.path.join(*[unquote(p) for p in parts])
82 84
83 85 def _notebook_dir_changed(self, new):
84 86 """do a bit of validation of the notebook dir"""
85 87 if not os.path.isabs(new):
86 88 # If we receive a non-absolute path, make it absolute.
87 89 abs_new = os.path.abspath(new)
88 90 #self.notebook_dir = os.path.dirname(abs_new)
89 91 return
90 92 if os.path.exists(new) and not os.path.isdir(new):
91 93 raise TraitError("notebook dir %r is not a directory" % new)
92 94 if not os.path.exists(new):
93 95 self.log.info("Creating notebook dir %s", new)
94 96 try:
95 97 os.mkdir(new)
96 98 except:
97 99 raise TraitError("Couldn't create notebook dir %r" % new)
98 100
99 101 allowed_formats = List([u'json',u'py'])
100 102
101 103 def add_new_folder(self, path=None):
102 104 new_path = os.path.join(self.notebook_dir, path)
103 105 if not os.path.exists(new_path):
104 106 os.makedirs(new_path)
105 107 else:
106 108 raise web.HTTPError(409, u'Directory already exists or creation permission not allowed.')
107 109
108 110 def load_notebook_names(self, path):
109 111 """Load the notebook names into memory.
110 112
111 113 This should be called once immediately after the notebook manager
112 114 is created to load the existing notebooks into the mapping in
113 115 memory.
114 116 """
115 117 self.list_notebooks(path)
116 118
117 119 def list_notebooks(self):
118 120 """List all notebooks.
119 121
120 122 This returns a list of dicts, each of the form::
121 123
122 124 dict(notebook_id=notebook,name=name)
123 125
124 126 This list of dicts should be sorted by name::
125 127
126 128 data = sorted(data, key=lambda item: item['name'])
127 129 """
128 130 raise NotImplementedError('must be implemented in a subclass')
129 131
130
131 def notebook_exists(self, notebook_path):
132 """Does a notebook exist?"""
133
134
135 132 def notebook_model(self, notebook_name, notebook_path=None, content=True):
136 133 """ Creates the standard notebook model """
137 134 last_modified, contents = self.read_notebook_object(notebook_name, notebook_path)
138 135 model = {"name": notebook_name,
139 136 "path": notebook_path,
140 137 "last_modified (UTC)": last_modified.ctime()}
141 138 if content == True:
142 139 model['content'] = contents
143 140 return model
144 141
145 142 def get_notebook(self, notebook_name, notebook_path=None, format=u'json'):
146 143 """Get the representation of a notebook in format by notebook_name."""
147 144 format = unicode(format)
148 145 if format not in self.allowed_formats:
149 146 raise web.HTTPError(415, u'Invalid notebook format: %s' % format)
150 147 kwargs = {}
151 148 last_mod, nb = self.read_notebook_object(notebook_name, notebook_path)
152 149 if format == 'json':
153 150 # don't split lines for sending over the wire, because it
154 151 # should match the Python in-memory format.
155 152 kwargs['split_lines'] = False
156 153 representation = current.writes(nb, format, **kwargs)
157 154 name = nb.metadata.get('name', 'notebook')
158 155 return last_mod, representation, name
159 156
160 157 def read_notebook_object(self, notebook_name, notebook_path=None):
161 158 """Get the object representation of a notebook by notebook_id."""
162 159 raise NotImplementedError('must be implemented in a subclass')
163 160
164 161 def save_new_notebook(self, data, notebook_path = None, name=None, format=u'json'):
165 162 """Save a new notebook and return its name.
166 163
167 164 If a name is passed in, it overrides any values in the notebook data
168 165 and the value in the data is updated to use that value.
169 166 """
170 167 if format not in self.allowed_formats:
171 168 raise web.HTTPError(415, u'Invalid notebook format: %s' % format)
172 169
173 170 try:
174 171 nb = current.reads(data.decode('utf-8'), format)
175 172 except:
176 173 raise web.HTTPError(400, u'Invalid JSON data')
177 174
178 175 if name is None:
179 176 try:
180 177 name = nb.metadata.name
181 178 except AttributeError:
182 179 raise web.HTTPError(400, u'Missing notebook name')
183 180 nb.metadata.name = name
184 181
185 182 notebook_name = self.write_notebook_object(nb, notebook_path=notebook_path)
186 183 return notebook_name
187 184
188 185 def save_notebook(self, data, notebook_path=None, name=None, new_name=None, format=u'json'):
189 186 """Save an existing notebook by notebook_name."""
190 187 if format not in self.allowed_formats:
191 188 raise web.HTTPError(415, u'Invalid notebook format: %s' % format)
192 189
193 190 try:
194 191 nb = current.reads(data.decode('utf-8'), format)
195 192 except:
196 193 raise web.HTTPError(400, u'Invalid JSON data')
197 194
198 195 if name is not None:
199 196 nb.metadata.name = name
200 197 self.write_notebook_object(nb, name, notebook_path, new_name)
201 198
202 199 def write_notebook_object(self, nb, notebook_name=None, notebook_path=None, new_name=None):
203 200 """Write a notebook object and return its notebook_name.
204 201
205 202 If notebook_name is None, this method should create a new notebook_name.
206 203 If notebook_name is not None, this method should check to make sure it
207 204 exists and is valid.
208 205 """
209 206 raise NotImplementedError('must be implemented in a subclass')
210 207
211 208 def delete_notebook(self, notebook_name, notebook_path):
212 209 """Delete notebook by notebook_id."""
213 210 raise NotImplementedError('must be implemented in a subclass')
214 211
215 212 def increment_filename(self, name):
216 213 """Increment a filename to make it unique.
217 214
218 215 This exists for notebook stores that must have unique names. When a notebook
219 216 is created or copied this method constructs a unique filename, typically
220 217 by appending an integer to the name.
221 218 """
222 219 return name
223 220
224 221 def new_notebook(self, notebook_path=None):
225 222 """Create a new notebook and return its notebook_name."""
226 223 name = self.increment_filename('Untitled', notebook_path)
227 224 metadata = current.new_metadata(name=name)
228 225 nb = current.new_notebook(metadata=metadata)
229 226 notebook_name = self.write_notebook_object(nb, notebook_path=notebook_path)
230 227 return notebook_name
231 228
232 229 def copy_notebook(self, name, path=None):
233 230 """Copy an existing notebook and return its new notebook_name."""
234 231 last_mod, nb = self.read_notebook_object(name, path)
235 232 name = nb.metadata.name + '-Copy'
236 233 name = self.increment_filename(name, path)
237 234 nb.metadata.name = name
238 235 notebook_name = self.write_notebook_object(nb, notebook_path = path)
239 236 return notebook_name
240 237
241 238 # Checkpoint-related
242 239
243 240 def create_checkpoint(self, notebook_name, notebook_path=None):
244 241 """Create a checkpoint of the current state of a notebook
245 242
246 243 Returns a checkpoint_id for the new checkpoint.
247 244 """
248 245 raise NotImplementedError("must be implemented in a subclass")
249 246
250 247 def list_checkpoints(self, notebook_name, notebook_path=None):
251 248 """Return a list of checkpoints for a given notebook"""
252 249 return []
253 250
254 251 def restore_checkpoint(self, notebook_name, checkpoint_id, notebook_path=None):
255 252 """Restore a notebook from one of its checkpoints"""
256 253 raise NotImplementedError("must be implemented in a subclass")
257 254
258 255 def delete_checkpoint(self, notebook_name, checkpoint_id, notebook_path=None):
259 256 """delete a checkpoint for a notebook"""
260 257 raise NotImplementedError("must be implemented in a subclass")
261 258
262 259 def log_info(self):
263 260 self.log.info(self.info_string())
264 261
265 262 def info_string(self):
266 263 return "Serving notebooks"
@@ -1,64 +1,90 b''
1 1 """Tests for the notebook manager."""
2 2
3 3 import os
4 4 from unittest import TestCase
5 5 from tempfile import NamedTemporaryFile
6 6
7 7 from IPython.utils.tempdir import TemporaryDirectory
8 8 from IPython.utils.traitlets import TraitError
9 9
10 10 from ..filenbmanager import FileNotebookManager
11 11 from ..nbmanager import NotebookManager
12 12
13 13 class TestFileNotebookManager(TestCase):
14 14
15 15 def test_nb_dir(self):
16 16 with TemporaryDirectory() as td:
17 17 km = FileNotebookManager(notebook_dir=td)
18 18 self.assertEqual(km.notebook_dir, td)
19 19
20 20 def test_create_nb_dir(self):
21 21 with TemporaryDirectory() as td:
22 22 nbdir = os.path.join(td, 'notebooks')
23 23 km = FileNotebookManager(notebook_dir=nbdir)
24 24 self.assertEqual(km.notebook_dir, nbdir)
25 25
26 26 def test_missing_nb_dir(self):
27 27 with TemporaryDirectory() as td:
28 28 nbdir = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
29 29 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=nbdir)
30 30
31 31 def test_invalid_nb_dir(self):
32 32 with NamedTemporaryFile() as tf:
33 33 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=tf.name)
34 34
35 35 class TestNotebookManager(TestCase):
36 36 def test_named_notebook_path(self):
37 37 nm = NotebookManager()
38
38
39 39 # doesn't end with ipynb, should just be path
40 40 name, path = nm.named_notebook_path('hello')
41 41 self.assertEqual(name, None)
42 42 self.assertEqual(path, '/hello/')
43
43
44 44 name, path = nm.named_notebook_path('/')
45 45 self.assertEqual(name, None)
46 46 self.assertEqual(path, '/')
47 47
48 48 name, path = nm.named_notebook_path('hello.ipynb')
49 49 self.assertEqual(name, 'hello.ipynb')
50 50 self.assertEqual(path, '/')
51
51
52 52 name, path = nm.named_notebook_path('/hello.ipynb')
53 53 self.assertEqual(name, 'hello.ipynb')
54 54 self.assertEqual(path, '/')
55
55
56 56 name, path = nm.named_notebook_path('/this/is/a/path/hello.ipynb')
57 57 self.assertEqual(name, 'hello.ipynb')
58 58 self.assertEqual(path, '/this/is/a/path/')
59
59
60 60 name, path = nm.named_notebook_path('path/without/leading/slash/hello.ipynb')
61 61 self.assertEqual(name, 'hello.ipynb')
62 62 self.assertEqual(path, '/path/without/leading/slash/')
63 63
64 def test_url_encode(self):
65 nm = NotebookManager()
66
67 # changes path or notebook name with special characters to url encoding
68 # these tests specifically encode paths with spaces
69 path = nm.url_encode('/this is a test/for spaces/')
70 self.assertEqual(path, '/this%20is%20a%20test/for%20spaces/')
71
72 path = nm.url_encode('notebook with space.ipynb')
73 self.assertEqual(path, 'notebook%20with%20space.ipynb')
74
75 path = nm.url_encode('/path with a/notebook and space.ipynb')
76 self.assertEqual(path, '/path%20with%20a/notebook%20and%20space.ipynb')
64 77
78 def test_url_decode(self):
79 nm = NotebookManager()
80
81 # decodes a url string to a plain string
82 # these tests decode paths with spaces
83 path = nm.url_decode('/this%20is%20a%20test/for%20spaces/')
84 self.assertEqual(path, '/this is a test/for spaces/')
85
86 path = nm.url_decode('notebook%20with%20space.ipynb')
87 self.assertEqual(path, 'notebook with space.ipynb')
88
89 path = nm.url_decode('/path%20with%20a/notebook%20and%20space.ipynb')
90 self.assertEqual(path, '/path with a/notebook and space.ipynb')
General Comments 0
You need to be logged in to leave comments. Login now