##// END OF EJS Templates
support deleting empty directories...
MinRK -
Show More
@@ -1,496 +1,506 b''
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 import io
8 8 import os
9 9 import glob
10 10 import shutil
11 11
12 12 from tornado import web
13 13
14 14 from .manager import ContentsManager
15 15 from IPython.nbformat import current
16 16 from IPython.utils.path import ensure_dir_exists
17 17 from IPython.utils.traitlets import Unicode, Bool, TraitError
18 18 from IPython.utils.py3compat import getcwd
19 19 from IPython.utils import tz
20 20 from IPython.html.utils import is_hidden, to_os_path
21 21
22 22
23 23 class FileContentsManager(ContentsManager):
24 24
25 25 root_dir = Unicode(getcwd(), config=True)
26 26
27 27 def _root_dir_changed(self, name, old, new):
28 28 """Do a bit of validation of the root_dir."""
29 29 if not os.path.isabs(new):
30 30 # If we receive a non-absolute path, make it absolute.
31 31 self.root_dir = os.path.abspath(new)
32 32 return
33 33 if not os.path.exists(new) or not os.path.isdir(new):
34 34 raise TraitError("%r is not a directory" % new)
35 35
36 36 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
37 37 help="""The directory name in which to keep file checkpoints
38 38
39 39 This is a path relative to the file's own directory.
40 40
41 41 By default, it is .ipynb_checkpoints
42 42 """
43 43 )
44 44
45 45 def _copy(self, src, dest):
46 46 """copy src to dest
47 47
48 48 like shutil.copy2, but log errors in copystat
49 49 """
50 50 shutil.copyfile(src, dest)
51 51 try:
52 52 shutil.copystat(src, dest)
53 53 except OSError as e:
54 54 self.log.debug("copystat on %s failed", dest, exc_info=True)
55 55
56 56 def _get_os_path(self, name=None, path=''):
57 57 """Given a filename and a URL path, return its file system
58 58 path.
59 59
60 60 Parameters
61 61 ----------
62 62 name : string
63 63 A filename
64 64 path : string
65 65 The relative URL path (with '/' as separator) to the named
66 66 file.
67 67
68 68 Returns
69 69 -------
70 70 path : string
71 71 API path to be evaluated relative to root_dir.
72 72 """
73 73 if name is not None:
74 74 path = path + '/' + name
75 75 return to_os_path(path, self.root_dir)
76 76
77 77 def path_exists(self, path):
78 78 """Does the API-style path refer to an extant directory?
79 79
80 80 Parameters
81 81 ----------
82 82 path : string
83 83 The path to check. This is an API path (`/` separated,
84 84 relative to root_dir).
85 85
86 86 Returns
87 87 -------
88 88 exists : bool
89 89 Whether the path is indeed a directory.
90 90 """
91 91 path = path.strip('/')
92 92 os_path = self._get_os_path(path=path)
93 93 return os.path.isdir(os_path)
94 94
95 95 def is_hidden(self, path):
96 96 """Does the API style path correspond to a hidden directory or file?
97 97
98 98 Parameters
99 99 ----------
100 100 path : string
101 101 The path to check. This is an API path (`/` separated,
102 102 relative to root_dir).
103 103
104 104 Returns
105 105 -------
106 106 exists : bool
107 107 Whether the path is hidden.
108 108
109 109 """
110 110 path = path.strip('/')
111 111 os_path = self._get_os_path(path=path)
112 112 return is_hidden(os_path, self.root_dir)
113 113
114 114 def file_exists(self, name, path=''):
115 115 """Returns True if the file exists, else returns False.
116 116
117 117 Parameters
118 118 ----------
119 119 name : string
120 120 The name of the file you are checking.
121 121 path : string
122 122 The relative path to the file's directory (with '/' as separator)
123 123
124 124 Returns
125 125 -------
126 126 bool
127 127 """
128 128 path = path.strip('/')
129 129 nbpath = self._get_os_path(name, path=path)
130 130 return os.path.isfile(nbpath)
131 131
132 132 def exists(self, name=None, path=''):
133 133 """Returns True if the path [and name] exists, else returns False.
134 134
135 135 Parameters
136 136 ----------
137 137 name : string
138 138 The name of the file you are checking.
139 139 path : string
140 140 The relative path to the file's directory (with '/' as separator)
141 141
142 142 Returns
143 143 -------
144 144 bool
145 145 """
146 146 path = path.strip('/')
147 147 os_path = self._get_os_path(name, path=path)
148 148 return os.path.exists(os_path)
149 149
150 150 def _base_model(self, name, path=''):
151 151 """Build the common base of a contents model"""
152 152 os_path = self._get_os_path(name, path)
153 153 info = os.stat(os_path)
154 154 last_modified = tz.utcfromtimestamp(info.st_mtime)
155 155 created = tz.utcfromtimestamp(info.st_ctime)
156 156 # Create the base model.
157 157 model = {}
158 158 model['name'] = name
159 159 model['path'] = path
160 160 model['last_modified'] = last_modified
161 161 model['created'] = created
162 162 model['content'] = None
163 163 model['format'] = None
164 164 return model
165 165
166 166 def _dir_model(self, name, path='', content=True):
167 167 """Build a model for a directory
168 168
169 169 if content is requested, will include a listing of the directory
170 170 """
171 171 os_path = self._get_os_path(name, path)
172 172
173 173 if not os.path.isdir(os_path):
174 174 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
175 175 elif is_hidden(os_path, self.root_dir):
176 176 self.log.info("Refusing to serve hidden directory, via 404 Error")
177 177 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
178 178
179 179 if name is None:
180 180 if '/' in path:
181 181 path, name = path.rsplit('/', 1)
182 182 else:
183 183 name = ''
184 184 model = self._base_model(name, path)
185 185 model['type'] = 'directory'
186 186 dir_path = u'{}/{}'.format(path, name)
187 187 if content:
188 188 model['content'] = contents = []
189 189 for os_path in glob.glob(self._get_os_path('*', dir_path)):
190 190 name = os.path.basename(os_path)
191 191 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
192 192 contents.append(self.get_model(name=name, path=dir_path, content=False))
193 193
194 194 model['format'] = 'json'
195 195
196 196 return model
197 197
198 198 def _file_model(self, name, path='', content=True):
199 199 """Build a model for a file
200 200
201 201 if content is requested, include the file contents.
202 202 UTF-8 text files will be unicode, binary files will be base64-encoded.
203 203 """
204 204 model = self._base_model(name, path)
205 205 model['type'] = 'file'
206 206 if content:
207 207 os_path = self._get_os_path(name, path)
208 208 try:
209 209 with io.open(os_path, 'r', encoding='utf-8') as f:
210 210 model['content'] = f.read()
211 211 except UnicodeError as e:
212 212 with io.open(os_path, 'rb') as f:
213 213 bcontent = f.read()
214 214 model['content'] = base64.encodestring(bcontent).decode('ascii')
215 215 model['format'] = 'base64'
216 216 else:
217 217 model['format'] = 'text'
218 218 return model
219 219
220 220
221 221 def _notebook_model(self, name, path='', content=True):
222 222 """Build a notebook model
223 223
224 224 if content is requested, the notebook content will be populated
225 225 as a JSON structure (not double-serialized)
226 226 """
227 227 model = self._base_model(name, path)
228 228 model['type'] = 'notebook'
229 229 if content:
230 230 os_path = self._get_os_path(name, path)
231 231 with io.open(os_path, 'r', encoding='utf-8') as f:
232 232 try:
233 233 nb = current.read(f, u'json')
234 234 except Exception as e:
235 235 raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
236 236 self.mark_trusted_cells(nb, name, path)
237 237 model['content'] = nb
238 238 model['format'] = 'json'
239 239 return model
240 240
241 241 def get_model(self, name, path='', content=True):
242 242 """ Takes a path and name for an entity and returns its model
243 243
244 244 Parameters
245 245 ----------
246 246 name : str
247 247 the name of the target
248 248 path : str
249 249 the URL path that describes the relative path for the target
250 250
251 251 Returns
252 252 -------
253 253 model : dict
254 254 the contents model. If content=True, returns the contents
255 255 of the file or directory as well.
256 256 """
257 257 path = path.strip('/')
258 258
259 259 if not self.exists(name=name, path=path):
260 260 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
261 261
262 262 os_path = self._get_os_path(name, path)
263 263 if os.path.isdir(os_path):
264 264 model = self._dir_model(name, path, content)
265 265 elif name.endswith('.ipynb'):
266 266 model = self._notebook_model(name, path, content)
267 267 else:
268 268 model = self._file_model(name, path, content)
269 269 return model
270 270
271 271 def _save_notebook(self, os_path, model, name='', path=''):
272 272 """save a notebook file"""
273 273 # Save the notebook file
274 274 nb = current.to_notebook_json(model['content'])
275 275
276 276 self.check_and_sign(nb, name, path)
277 277
278 278 if 'name' in nb['metadata']:
279 279 nb['metadata']['name'] = u''
280 280
281 281 with io.open(os_path, 'w', encoding='utf-8') as f:
282 282 current.write(nb, f, u'json')
283 283
284 284 def _save_file(self, os_path, model, name='', path=''):
285 285 """save a non-notebook file"""
286 286 fmt = model.get('format', None)
287 287 if fmt not in {'text', 'base64'}:
288 288 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
289 289 try:
290 290 content = model['content']
291 291 if fmt == 'text':
292 292 bcontent = content.encode('utf8')
293 293 else:
294 294 b64_bytes = content.encode('ascii')
295 295 bcontent = base64.decodestring(b64_bytes)
296 296 except Exception as e:
297 297 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
298 298 with io.open(os_path, 'wb') as f:
299 299 f.write(bcontent)
300 300
301 301 def _save_directory(self, os_path, model, name='', path=''):
302 302 """create a directory"""
303 303 if not os.path.exists(os_path):
304 304 os.mkdir(os_path)
305 305 elif not os.path.isdir(os_path):
306 306 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
307 307
308 308 def save(self, model, name='', path=''):
309 309 """Save the file model and return the model with no content."""
310 310 path = path.strip('/')
311 311
312 312 if 'content' not in model:
313 313 raise web.HTTPError(400, u'No file content provided')
314 314 if 'type' not in model:
315 315 raise web.HTTPError(400, u'No file type provided')
316 316
317 317 # One checkpoint should always exist
318 318 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
319 319 self.create_checkpoint(name, path)
320 320
321 321 new_path = model.get('path', path).strip('/')
322 322 new_name = model.get('name', name)
323 323
324 324 if path != new_path or name != new_name:
325 325 self.rename(name, path, new_name, new_path)
326 326
327 327 os_path = self._get_os_path(new_name, new_path)
328 328 self.log.debug("Saving %s", os_path)
329 329 try:
330 330 if model['type'] == 'notebook':
331 331 self._save_notebook(os_path, model, new_name, new_path)
332 332 elif model['type'] == 'file':
333 333 self._save_file(os_path, model, new_name, new_path)
334 334 elif model['type'] == 'directory':
335 335 self._save_directory(os_path, model, new_name, new_path)
336 336 else:
337 337 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
338 338 except web.HTTPError:
339 339 raise
340 340 except Exception as e:
341 341 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
342 342
343 343 model = self.get_model(new_name, new_path, content=False)
344 344 return model
345 345
346 346 def update(self, model, name, path=''):
347 347 """Update the file's path and/or name"""
348 348 path = path.strip('/')
349 349 new_name = model.get('name', name)
350 350 new_path = model.get('path', path).strip('/')
351 351 if path != new_path or name != new_name:
352 352 self.rename(name, path, new_name, new_path)
353 353 model = self.get_model(new_name, new_path, content=False)
354 354 return model
355 355
356 356 def delete(self, name, path=''):
357 357 """Delete file by name and path."""
358 358 path = path.strip('/')
359 359 os_path = self._get_os_path(name, path)
360 if not os.path.isfile(os_path):
360 rm = os.unlink
361 if os.path.isdir(os_path):
362 listing = os.listdir(os_path)
363 # don't delete non-empty directories (checkpoints dir doesn't count)
364 if listing and listing != ['.ipynb_checkpoints']:
365 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
366 elif not os.path.isfile(os_path):
361 367 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
362 368
363 369 # clear checkpoints
364 370 for checkpoint in self.list_checkpoints(name, path):
365 371 checkpoint_id = checkpoint['id']
366 372 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
367 373 if os.path.isfile(cp_path):
368 374 self.log.debug("Unlinking checkpoint %s", cp_path)
369 375 os.unlink(cp_path)
370 376
371 self.log.debug("Unlinking file %s", os_path)
372 os.unlink(os_path)
377 if os.path.isdir(os_path):
378 self.log.debug("Removing directory %s", os_path)
379 shutil.rmtree(os_path)
380 else:
381 self.log.debug("Unlinking file %s", os_path)
382 rm(os_path)
373 383
374 384 def rename(self, old_name, old_path, new_name, new_path):
375 385 """Rename a file."""
376 386 old_path = old_path.strip('/')
377 387 new_path = new_path.strip('/')
378 388 if new_name == old_name and new_path == old_path:
379 389 return
380 390
381 391 new_os_path = self._get_os_path(new_name, new_path)
382 392 old_os_path = self._get_os_path(old_name, old_path)
383 393
384 394 # Should we proceed with the move?
385 395 if os.path.isfile(new_os_path):
386 396 raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
387 397
388 398 # Move the file
389 399 try:
390 400 shutil.move(old_os_path, new_os_path)
391 401 except Exception as e:
392 402 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
393 403
394 404 # Move the checkpoints
395 405 old_checkpoints = self.list_checkpoints(old_name, old_path)
396 406 for cp in old_checkpoints:
397 407 checkpoint_id = cp['id']
398 408 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
399 409 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
400 410 if os.path.isfile(old_cp_path):
401 411 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
402 412 shutil.move(old_cp_path, new_cp_path)
403 413
404 414 # Checkpoint-related utilities
405 415
406 416 def get_checkpoint_path(self, checkpoint_id, name, path=''):
407 417 """find the path to a checkpoint"""
408 418 path = path.strip('/')
409 419 basename, ext = os.path.splitext(name)
410 420 filename = u"{name}-{checkpoint_id}{ext}".format(
411 421 name=basename,
412 422 checkpoint_id=checkpoint_id,
413 423 ext=ext,
414 424 )
415 425 os_path = self._get_os_path(path=path)
416 426 cp_dir = os.path.join(os_path, self.checkpoint_dir)
417 427 ensure_dir_exists(cp_dir)
418 428 cp_path = os.path.join(cp_dir, filename)
419 429 return cp_path
420 430
421 431 def get_checkpoint_model(self, checkpoint_id, name, path=''):
422 432 """construct the info dict for a given checkpoint"""
423 433 path = path.strip('/')
424 434 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
425 435 stats = os.stat(cp_path)
426 436 last_modified = tz.utcfromtimestamp(stats.st_mtime)
427 437 info = dict(
428 438 id = checkpoint_id,
429 439 last_modified = last_modified,
430 440 )
431 441 return info
432 442
433 443 # public checkpoint API
434 444
435 445 def create_checkpoint(self, name, path=''):
436 446 """Create a checkpoint from the current state of a file"""
437 447 path = path.strip('/')
438 448 src_path = self._get_os_path(name, path)
439 449 # only the one checkpoint ID:
440 450 checkpoint_id = u"checkpoint"
441 451 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
442 452 self.log.debug("creating checkpoint for %s", name)
443 453 self._copy(src_path, cp_path)
444 454
445 455 # return the checkpoint info
446 456 return self.get_checkpoint_model(checkpoint_id, name, path)
447 457
448 458 def list_checkpoints(self, name, path=''):
449 459 """list the checkpoints for a given file
450 460
451 461 This contents manager currently only supports one checkpoint per file.
452 462 """
453 463 path = path.strip('/')
454 464 checkpoint_id = "checkpoint"
455 465 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
456 466 if not os.path.exists(os_path):
457 467 return []
458 468 else:
459 469 return [self.get_checkpoint_model(checkpoint_id, name, path)]
460 470
461 471
462 472 def restore_checkpoint(self, checkpoint_id, name, path=''):
463 473 """restore a file to a checkpointed state"""
464 474 path = path.strip('/')
465 475 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
466 476 nb_path = self._get_os_path(name, path)
467 477 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
468 478 if not os.path.isfile(cp_path):
469 479 self.log.debug("checkpoint file does not exist: %s", cp_path)
470 480 raise web.HTTPError(404,
471 481 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
472 482 )
473 483 # ensure notebook is readable (never restore from an unreadable notebook)
474 484 if cp_path.endswith('.ipynb'):
475 485 with io.open(cp_path, 'r', encoding='utf-8') as f:
476 486 current.read(f, u'json')
477 487 self._copy(cp_path, nb_path)
478 488 self.log.debug("copying %s -> %s", cp_path, nb_path)
479 489
480 490 def delete_checkpoint(self, checkpoint_id, name, path=''):
481 491 """delete a file's checkpoint"""
482 492 path = path.strip('/')
483 493 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
484 494 if not os.path.isfile(cp_path):
485 495 raise web.HTTPError(404,
486 496 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
487 497 )
488 498 self.log.debug("unlinking %s", cp_path)
489 499 os.unlink(cp_path)
490 500
491 501 def info_string(self):
492 502 return "Serving notebooks from local directory: %s" % self.root_dir
493 503
494 504 def get_kernel_path(self, name, path='', model=None):
495 505 """Return the initial working dir a kernel associated with a given notebook"""
496 506 return os.path.join(self.root_dir, path)
@@ -1,257 +1,261 b''
1 1 """A base class for contents managers."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from fnmatch import fnmatch
7 7 import itertools
8 8 import os
9 9
10 from tornado.web import HTTPError
11
10 12 from IPython.config.configurable import LoggingConfigurable
11 13 from IPython.nbformat import current, sign
12 14 from IPython.utils.traitlets import Instance, Unicode, List
13 15
14 16
15 17 class ContentsManager(LoggingConfigurable):
16 18
17 19 notary = Instance(sign.NotebookNotary)
18 20 def _notary_default(self):
19 21 return sign.NotebookNotary(parent=self)
20 22
21 23 hide_globs = List(Unicode, [
22 24 u'__pycache__', '*.pyc', '*.pyo',
23 25 '.DS_Store', '*.so', '*.dylib', '*~',
24 26 ], config=True, help="""
25 27 Glob patterns to hide in file and directory listings.
26 28 """)
27 29
28 30 # ContentsManager API part 1: methods that must be
29 31 # implemented in subclasses.
30 32
31 33 def path_exists(self, path):
32 34 """Does the API-style path (directory) actually exist?
33 35
34 36 Override this method in subclasses.
35 37
36 38 Parameters
37 39 ----------
38 40 path : string
39 41 The path to check
40 42
41 43 Returns
42 44 -------
43 45 exists : bool
44 46 Whether the path does indeed exist.
45 47 """
46 48 raise NotImplementedError
47 49
48 50 def is_hidden(self, path):
49 51 """Does the API style path correspond to a hidden directory or file?
50 52
51 53 Parameters
52 54 ----------
53 55 path : string
54 56 The path to check. This is an API path (`/` separated,
55 57 relative to root dir).
56 58
57 59 Returns
58 60 -------
59 61 exists : bool
60 62 Whether the path is hidden.
61 63
62 64 """
63 65 raise NotImplementedError
64 66
65 67 def file_exists(self, name, path=''):
66 68 """Returns a True if the file exists. Else, returns False.
67 69
68 70 Parameters
69 71 ----------
70 72 name : string
71 73 The name of the file you are checking.
72 74 path : string
73 75 The relative path to the file's directory (with '/' as separator)
74 76
75 77 Returns
76 78 -------
77 79 bool
78 80 """
79 81 raise NotImplementedError('must be implemented in a subclass')
80 82
81 83 def list(self, path=''):
82 84 """Return a list of contents dicts without content.
83 85
84 86 This returns a list of dicts
85 87
86 88 This list of dicts should be sorted by name::
87 89
88 90 data = sorted(data, key=lambda item: item['name'])
89 91 """
90 92 raise NotImplementedError('must be implemented in a subclass')
91 93
92 94 def get_model(self, name, path='', content=True):
93 95 """Get the model of a file or directory with or without content."""
94 96 raise NotImplementedError('must be implemented in a subclass')
95 97
96 98 def save(self, model, name, path=''):
97 99 """Save the file or directory and return the model with no content."""
98 100 raise NotImplementedError('must be implemented in a subclass')
99 101
100 102 def update(self, model, name, path=''):
101 103 """Update the file or directory and return the model with no content."""
102 104 raise NotImplementedError('must be implemented in a subclass')
103 105
104 106 def delete(self, name, path=''):
105 107 """Delete file or directory by name and path."""
106 108 raise NotImplementedError('must be implemented in a subclass')
107 109
108 110 def create_checkpoint(self, name, path=''):
109 111 """Create a checkpoint of the current state of a file
110 112
111 113 Returns a checkpoint_id for the new checkpoint.
112 114 """
113 115 raise NotImplementedError("must be implemented in a subclass")
114 116
115 117 def list_checkpoints(self, name, path=''):
116 118 """Return a list of checkpoints for a given file"""
117 119 return []
118 120
119 121 def restore_checkpoint(self, checkpoint_id, name, path=''):
120 122 """Restore a file from one of its checkpoints"""
121 123 raise NotImplementedError("must be implemented in a subclass")
122 124
123 125 def delete_checkpoint(self, checkpoint_id, name, path=''):
124 126 """delete a checkpoint for a file"""
125 127 raise NotImplementedError("must be implemented in a subclass")
126 128
127 129 def info_string(self):
128 130 return "Serving notebooks"
129 131
130 132 # ContentsManager API part 2: methods that have useable default
131 133 # implementations, but can be overridden in subclasses.
132 134
133 135 def get_kernel_path(self, name, path='', model=None):
134 136 """ Return the path to start kernel in """
135 137 return path
136 138
137 139 def increment_filename(self, filename, path=''):
138 140 """Increment a filename until it is unique.
139 141
140 142 Parameters
141 143 ----------
142 144 filename : unicode
143 145 The name of a file, including extension
144 146 path : unicode
145 147 The URL path of the target's directory
146 148
147 149 Returns
148 150 -------
149 151 name : unicode
150 152 A filename that is unique, based on the input filename.
151 153 """
152 154 path = path.strip('/')
153 155 basename, ext = os.path.splitext(filename)
154 156 for i in itertools.count():
155 157 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
156 158 ext=ext)
157 159 if not self.file_exists(name, path):
158 160 break
159 161 return name
160 162
161 163 def create_file(self, model=None, path='', ext='.ipynb'):
162 164 """Create a new file or directory and return its model with no content."""
163 165 path = path.strip('/')
164 166 if model is None:
165 167 model = {}
166 168 if 'content' not in model:
167 169 if ext == '.ipynb':
168 170 metadata = current.new_metadata(name=u'')
169 171 model['content'] = current.new_notebook(metadata=metadata)
170 172 model.setdefault('type', 'notebook')
171 173 model.setdefault('format', 'json')
172 174 else:
173 175 model['content'] = ''
174 176 model.setdefault('type', 'file')
175 177 model.setdefault('format', 'text')
176 178 if 'name' not in model:
177 179 model['name'] = self.increment_filename('Untitled' + ext, path)
178 180
179 181 model['path'] = path
180 182 model = self.save(model, model['name'], model['path'])
181 183 return model
182 184
183 185 def copy(self, from_name, to_name=None, path=''):
184 186 """Copy an existing file and return its new model.
185 187
186 188 If to_name not specified, increment `from_name-Copy#.ipynb`.
187 189 """
188 190 path = path.strip('/')
189 191 model = self.get_model(from_name, path)
192 if model['type'] == 'directory':
193 raise HTTPError(400, "Can't copy directories")
190 194 if not to_name:
191 195 base, ext = os.path.splitext(from_name)
192 196 copy_name = u'{0}-Copy{1}'.format(base, ext)
193 197 to_name = self.increment_filename(copy_name, path)
194 198 model['name'] = to_name
195 199 model = self.save(model, to_name, path)
196 200 return model
197 201
198 202 def log_info(self):
199 203 self.log.info(self.info_string())
200 204
201 205 def trust_notebook(self, name, path=''):
202 206 """Explicitly trust a notebook
203 207
204 208 Parameters
205 209 ----------
206 210 name : string
207 211 The filename of the notebook
208 212 path : string
209 213 The notebook's directory
210 214 """
211 215 model = self.get_model(name, path)
212 216 nb = model['content']
213 217 self.log.warn("Trusting notebook %s/%s", path, name)
214 218 self.notary.mark_cells(nb, True)
215 219 self.save(model, name, path)
216 220
217 221 def check_and_sign(self, nb, name, path=''):
218 222 """Check for trusted cells, and sign the notebook.
219 223
220 224 Called as a part of saving notebooks.
221 225
222 226 Parameters
223 227 ----------
224 228 nb : dict
225 229 The notebook structure
226 230 name : string
227 231 The filename of the notebook
228 232 path : string
229 233 The notebook's directory
230 234 """
231 235 if self.notary.check_cells(nb):
232 236 self.notary.sign(nb)
233 237 else:
234 238 self.log.warn("Saving untrusted notebook %s/%s", path, name)
235 239
236 240 def mark_trusted_cells(self, nb, name, path=''):
237 241 """Mark cells as trusted if the notebook signature matches.
238 242
239 243 Called as a part of loading notebooks.
240 244
241 245 Parameters
242 246 ----------
243 247 nb : dict
244 248 The notebook structure
245 249 name : string
246 250 The filename of the notebook
247 251 path : string
248 252 The notebook's directory
249 253 """
250 254 trusted = self.notary.check_signature(nb)
251 255 if not trusted:
252 256 self.log.warn("Notebook %s/%s is not trusted", path, name)
253 257 self.notary.mark_cells(nb, trusted)
254 258
255 259 def should_list(self, name):
256 260 """Should this file/directory name be displayed in a listing?"""
257 261 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,459 +1,479 b''
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 import io
6 6 import json
7 7 import os
8 8 import shutil
9 9 from unicodedata import normalize
10 10
11 11 pjoin = os.path.join
12 12
13 13 import requests
14 14
15 15 from IPython.html.utils import url_path_join, url_escape
16 16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 17 from IPython.nbformat import current
18 18 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
19 19 new_heading_cell, to_notebook_json)
20 20 from IPython.nbformat import v2
21 21 from IPython.utils import py3compat
22 22 from IPython.utils.data import uniq_stable
23 23
24 24
25 25 # TODO: Remove this after we create the contents web service and directories are
26 26 # no longer listed by the notebook web service.
27 27 def notebooks_only(dir_model):
28 28 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
29 29
30 30 def dirs_only(dir_model):
31 31 return [x for x in dir_model['content'] if x['type']=='directory']
32 32
33 33
34 34 class API(object):
35 35 """Wrapper for contents API calls."""
36 36 def __init__(self, base_url):
37 37 self.base_url = base_url
38 38
39 39 def _req(self, verb, path, body=None):
40 40 response = requests.request(verb,
41 41 url_path_join(self.base_url, 'api/contents', path),
42 42 data=body,
43 43 )
44 44 response.raise_for_status()
45 45 return response
46 46
47 47 def list(self, path='/'):
48 48 return self._req('GET', path)
49 49
50 50 def read(self, name, path='/'):
51 51 return self._req('GET', url_path_join(path, name))
52 52
53 53 def create_untitled(self, path='/', ext=None):
54 54 body = None
55 55 if ext:
56 56 body = json.dumps({'ext': ext})
57 57 return self._req('POST', path, body)
58 58
59 59 def upload_untitled(self, body, path='/'):
60 60 return self._req('POST', path, body)
61 61
62 62 def copy_untitled(self, copy_from, path='/'):
63 63 body = json.dumps({'copy_from':copy_from})
64 64 return self._req('POST', path, body)
65 65
66 66 def create(self, name, path='/'):
67 67 return self._req('PUT', url_path_join(path, name))
68 68
69 69 def upload(self, name, body, path='/'):
70 70 return self._req('PUT', url_path_join(path, name), body)
71 71
72 def mkdir(self, name, path='/'):
73 return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
74
72 75 def copy(self, copy_from, copy_to, path='/'):
73 76 body = json.dumps({'copy_from':copy_from})
74 77 return self._req('PUT', url_path_join(path, copy_to), body)
75 78
76 79 def save(self, name, body, path='/'):
77 80 return self._req('PUT', url_path_join(path, name), body)
78 81
79 82 def delete(self, name, path='/'):
80 83 return self._req('DELETE', url_path_join(path, name))
81 84
82 85 def rename(self, name, path, new_name):
83 86 body = json.dumps({'name': new_name})
84 87 return self._req('PATCH', url_path_join(path, name), body)
85 88
86 89 def get_checkpoints(self, name, path):
87 90 return self._req('GET', url_path_join(path, name, 'checkpoints'))
88 91
89 92 def new_checkpoint(self, name, path):
90 93 return self._req('POST', url_path_join(path, name, 'checkpoints'))
91 94
92 95 def restore_checkpoint(self, name, path, checkpoint_id):
93 96 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
94 97
95 98 def delete_checkpoint(self, name, path, checkpoint_id):
96 99 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
97 100
98 101 class APITest(NotebookTestBase):
99 102 """Test the kernels web service API"""
100 103 dirs_nbs = [('', 'inroot'),
101 104 ('Directory with spaces in', 'inspace'),
102 105 (u'unicodé', 'innonascii'),
103 106 ('foo', 'a'),
104 107 ('foo', 'b'),
105 108 ('foo', 'name with spaces'),
106 109 ('foo', u'unicodé'),
107 110 ('foo/bar', 'baz'),
108 111 ('ordering', 'A'),
109 112 ('ordering', 'b'),
110 113 ('ordering', 'C'),
111 114 (u'å b', u'ç d'),
112 115 ]
113 116 hidden_dirs = ['.hidden', '__pycache__']
114 117
115 118 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
116 119 del dirs[0] # remove ''
117 120 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
118 121
119 122 @staticmethod
120 123 def _blob_for_name(name):
121 124 return name.encode('utf-8') + b'\xFF'
122 125
123 126 @staticmethod
124 127 def _txt_for_name(name):
125 128 return u'%s text file' % name
126 129
127 130 def setUp(self):
128 131 nbdir = self.notebook_dir.name
129 132 self.blob = os.urandom(100)
130 133 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
131 134
132 135
133 136
134 137 for d in (self.dirs + self.hidden_dirs):
135 138 d.replace('/', os.sep)
136 139 if not os.path.isdir(pjoin(nbdir, d)):
137 140 os.mkdir(pjoin(nbdir, d))
138 141
139 142 for d, name in self.dirs_nbs:
140 143 d = d.replace('/', os.sep)
141 144 # create a notebook
142 145 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
143 146 encoding='utf-8') as f:
144 147 nb = new_notebook(name=name)
145 148 write(nb, f, format='ipynb')
146 149
147 150 # create a text file
148 151 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
149 152 encoding='utf-8') as f:
150 153 f.write(self._txt_for_name(name))
151 154
152 155 # create a binary file
153 156 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
154 157 f.write(self._blob_for_name(name))
155 158
156 159 self.api = API(self.base_url())
157 160
158 161 def tearDown(self):
159 162 nbdir = self.notebook_dir.name
160 163
161 164 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
162 165 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
163 166
164 167 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
165 168 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
166 169
167 170 def test_list_notebooks(self):
168 171 nbs = notebooks_only(self.api.list().json())
169 172 self.assertEqual(len(nbs), 1)
170 173 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
171 174
172 175 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
173 176 self.assertEqual(len(nbs), 1)
174 177 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
175 178
176 179 nbs = notebooks_only(self.api.list(u'/unicodé/').json())
177 180 self.assertEqual(len(nbs), 1)
178 181 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
179 182 self.assertEqual(nbs[0]['path'], u'unicodé')
180 183
181 184 nbs = notebooks_only(self.api.list('/foo/bar/').json())
182 185 self.assertEqual(len(nbs), 1)
183 186 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
184 187 self.assertEqual(nbs[0]['path'], 'foo/bar')
185 188
186 189 nbs = notebooks_only(self.api.list('foo').json())
187 190 self.assertEqual(len(nbs), 4)
188 191 nbnames = { normalize('NFC', n['name']) for n in nbs }
189 192 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
190 193 expected = { normalize('NFC', name) for name in expected }
191 194 self.assertEqual(nbnames, expected)
192 195
193 196 nbs = notebooks_only(self.api.list('ordering').json())
194 197 nbnames = [n['name'] for n in nbs]
195 198 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
196 199 self.assertEqual(nbnames, expected)
197 200
198 201 def test_list_dirs(self):
199 202 dirs = dirs_only(self.api.list().json())
200 203 dir_names = {normalize('NFC', d['name']) for d in dirs}
201 204 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
202 205
203 206 def test_list_nonexistant_dir(self):
204 207 with assert_http_error(404):
205 208 self.api.list('nonexistant')
206 209
207 210 def test_get_nb_contents(self):
208 211 for d, name in self.dirs_nbs:
209 212 nb = self.api.read('%s.ipynb' % name, d+'/').json()
210 213 self.assertEqual(nb['name'], u'%s.ipynb' % name)
211 214 self.assertEqual(nb['type'], 'notebook')
212 215 self.assertIn('content', nb)
213 216 self.assertEqual(nb['format'], 'json')
214 217 self.assertIn('content', nb)
215 218 self.assertIn('metadata', nb['content'])
216 219 self.assertIsInstance(nb['content']['metadata'], dict)
217 220
218 221 def test_get_contents_no_such_file(self):
219 222 # Name that doesn't exist - should be a 404
220 223 with assert_http_error(404):
221 224 self.api.read('q.ipynb', 'foo')
222 225
223 226 def test_get_text_file_contents(self):
224 227 for d, name in self.dirs_nbs:
225 228 model = self.api.read(u'%s.txt' % name, d+'/').json()
226 229 self.assertEqual(model['name'], u'%s.txt' % name)
227 230 self.assertIn('content', model)
228 231 self.assertEqual(model['format'], 'text')
229 232 self.assertEqual(model['type'], 'file')
230 233 self.assertEqual(model['content'], self._txt_for_name(name))
231 234
232 235 # Name that doesn't exist - should be a 404
233 236 with assert_http_error(404):
234 237 self.api.read('q.txt', 'foo')
235 238
236 239 def test_get_binary_file_contents(self):
237 240 for d, name in self.dirs_nbs:
238 241 model = self.api.read(u'%s.blob' % name, d+'/').json()
239 242 self.assertEqual(model['name'], u'%s.blob' % name)
240 243 self.assertIn('content', model)
241 244 self.assertEqual(model['format'], 'base64')
242 245 self.assertEqual(model['type'], 'file')
243 246 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
244 247 self.assertEqual(model['content'], b64_data)
245 248
246 249 # Name that doesn't exist - should be a 404
247 250 with assert_http_error(404):
248 251 self.api.read('q.txt', 'foo')
249 252
250 253 def _check_created(self, resp, name, path, type='notebook'):
251 254 self.assertEqual(resp.status_code, 201)
252 255 location_header = py3compat.str_to_unicode(resp.headers['Location'])
253 256 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
254 257 rjson = resp.json()
255 258 self.assertEqual(rjson['name'], name)
256 259 self.assertEqual(rjson['path'], path)
257 260 self.assertEqual(rjson['type'], type)
258 261 isright = os.path.isdir if type == 'directory' else os.path.isfile
259 262 assert isright(pjoin(
260 263 self.notebook_dir.name,
261 264 path.replace('/', os.sep),
262 265 name,
263 266 ))
264 267
265 268 def test_create_untitled(self):
266 269 resp = self.api.create_untitled(path=u'å b')
267 270 self._check_created(resp, 'Untitled0.ipynb', u'å b')
268 271
269 272 # Second time
270 273 resp = self.api.create_untitled(path=u'å b')
271 274 self._check_created(resp, 'Untitled1.ipynb', u'å b')
272 275
273 276 # And two directories down
274 277 resp = self.api.create_untitled(path='foo/bar')
275 278 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
276 279
277 280 def test_create_untitled_txt(self):
278 281 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
279 282 self._check_created(resp, 'Untitled0.txt', 'foo/bar', type='file')
280 283
281 284 resp = self.api.read(path='foo/bar', name='Untitled0.txt')
282 285 model = resp.json()
283 286 self.assertEqual(model['type'], 'file')
284 287 self.assertEqual(model['format'], 'text')
285 288 self.assertEqual(model['content'], '')
286 289
287 290 def test_upload_untitled(self):
288 291 nb = new_notebook(name='Upload test')
289 292 nbmodel = {'content': nb, 'type': 'notebook'}
290 293 resp = self.api.upload_untitled(path=u'å b',
291 294 body=json.dumps(nbmodel))
292 295 self._check_created(resp, 'Untitled0.ipynb', u'å b')
293 296
294 297 def test_upload(self):
295 298 nb = new_notebook(name=u'ignored')
296 299 nbmodel = {'content': nb, 'type': 'notebook'}
297 300 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
298 301 body=json.dumps(nbmodel))
299 302 self._check_created(resp, u'Upload tést.ipynb', u'å b')
300 303
301 304 def test_mkdir(self):
302 model = {'type': 'directory'}
303 resp = self.api.upload(u'New ∂ir', path=u'å b',
304 body=json.dumps(model))
305 resp = self.api.mkdir(u'New ∂ir', path=u'å b')
305 306 self._check_created(resp, u'New ∂ir', u'å b', type='directory')
306 307
307 308 def test_upload_txt(self):
308 309 body = u'ünicode téxt'
309 310 model = {
310 311 'content' : body,
311 312 'format' : 'text',
312 313 'type' : 'file',
313 314 }
314 315 resp = self.api.upload(u'Upload tést.txt', path=u'å b',
315 316 body=json.dumps(model))
316 317
317 318 # check roundtrip
318 319 resp = self.api.read(path=u'å b', name=u'Upload tést.txt')
319 320 model = resp.json()
320 321 self.assertEqual(model['type'], 'file')
321 322 self.assertEqual(model['format'], 'text')
322 323 self.assertEqual(model['content'], body)
323 324
324 325 def test_upload_b64(self):
325 326 body = b'\xFFblob'
326 327 b64body = base64.encodestring(body).decode('ascii')
327 328 model = {
328 329 'content' : b64body,
329 330 'format' : 'base64',
330 331 'type' : 'file',
331 332 }
332 333 resp = self.api.upload(u'Upload tést.blob', path=u'å b',
333 334 body=json.dumps(model))
334 335
335 336 # check roundtrip
336 337 resp = self.api.read(path=u'å b', name=u'Upload tést.blob')
337 338 model = resp.json()
338 339 self.assertEqual(model['type'], 'file')
339 340 self.assertEqual(model['format'], 'base64')
340 341 decoded = base64.decodestring(model['content'].encode('ascii'))
341 342 self.assertEqual(decoded, body)
342 343
343 344 def test_upload_v2(self):
344 345 nb = v2.new_notebook()
345 346 ws = v2.new_worksheet()
346 347 nb.worksheets.append(ws)
347 348 ws.cells.append(v2.new_code_cell(input='print("hi")'))
348 349 nbmodel = {'content': nb, 'type': 'notebook'}
349 350 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
350 351 body=json.dumps(nbmodel))
351 352 self._check_created(resp, u'Upload tést.ipynb', u'å b')
352 353 resp = self.api.read(u'Upload tést.ipynb', u'å b')
353 354 data = resp.json()
354 355 self.assertEqual(data['content']['nbformat'], current.nbformat)
355 356 self.assertEqual(data['content']['orig_nbformat'], 2)
356 357
357 358 def test_copy_untitled(self):
358 359 resp = self.api.copy_untitled(u'ç d.ipynb', path=u'å b')
359 360 self._check_created(resp, u'ç d-Copy0.ipynb', u'å b')
360 361
361 362 def test_copy(self):
362 363 resp = self.api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
363 364 self._check_created(resp, u'cøpy.ipynb', u'å b')
364 365
366 def test_copy_dir_400(self):
367 # can't copy directories
368 with assert_http_error(400):
369 resp = self.api.copy(u'å b', u'å c')
370
365 371 def test_delete(self):
366 372 for d, name in self.dirs_nbs:
367 373 resp = self.api.delete('%s.ipynb' % name, d)
368 374 self.assertEqual(resp.status_code, 204)
369 375
370 376 for d in self.dirs + ['/']:
371 377 nbs = notebooks_only(self.api.list(d).json())
372 378 self.assertEqual(len(nbs), 0)
373 379
380 def test_delete_dirs(self):
381 # depth-first delete everything, so we don't try to delete empty directories
382 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
383 listing = self.api.list(name).json()['content']
384 for model in listing:
385 self.api.delete(model['name'], model['path'])
386 listing = self.api.list('/').json()['content']
387 self.assertEqual(listing, [])
388
389 def test_delete_non_empty_dir(self):
390 """delete non-empty dir raises 400"""
391 with assert_http_error(400):
392 self.api.delete(u'å b')
393
374 394 def test_rename(self):
375 395 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
376 396 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
377 397 self.assertEqual(resp.json()['name'], 'z.ipynb')
378 398 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
379 399
380 400 nbs = notebooks_only(self.api.list('foo').json())
381 401 nbnames = set(n['name'] for n in nbs)
382 402 self.assertIn('z.ipynb', nbnames)
383 403 self.assertNotIn('a.ipynb', nbnames)
384 404
385 405 def test_rename_existing(self):
386 406 with assert_http_error(409):
387 407 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
388 408
389 409 def test_save(self):
390 410 resp = self.api.read('a.ipynb', 'foo')
391 411 nbcontent = json.loads(resp.text)['content']
392 412 nb = to_notebook_json(nbcontent)
393 413 ws = new_worksheet()
394 414 nb.worksheets = [ws]
395 415 ws.cells.append(new_heading_cell(u'Created by test ³'))
396 416
397 417 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
398 418 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
399 419
400 420 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
401 421 with io.open(nbfile, 'r', encoding='utf-8') as f:
402 422 newnb = read(f, format='ipynb')
403 423 self.assertEqual(newnb.worksheets[0].cells[0].source,
404 424 u'Created by test ³')
405 425 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
406 426 newnb = to_notebook_json(nbcontent)
407 427 self.assertEqual(newnb.worksheets[0].cells[0].source,
408 428 u'Created by test ³')
409 429
410 430 # Save and rename
411 431 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
412 432 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
413 433 saved = resp.json()
414 434 self.assertEqual(saved['name'], 'a2.ipynb')
415 435 self.assertEqual(saved['path'], 'foo/bar')
416 436 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
417 437 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
418 438 with assert_http_error(404):
419 439 self.api.read('a.ipynb', 'foo')
420 440
421 441 def test_checkpoints(self):
422 442 resp = self.api.read('a.ipynb', 'foo')
423 443 r = self.api.new_checkpoint('a.ipynb', 'foo')
424 444 self.assertEqual(r.status_code, 201)
425 445 cp1 = r.json()
426 446 self.assertEqual(set(cp1), {'id', 'last_modified'})
427 447 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
428 448
429 449 # Modify it
430 450 nbcontent = json.loads(resp.text)['content']
431 451 nb = to_notebook_json(nbcontent)
432 452 ws = new_worksheet()
433 453 nb.worksheets = [ws]
434 454 hcell = new_heading_cell('Created by test')
435 455 ws.cells.append(hcell)
436 456 # Save
437 457 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
438 458 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
439 459
440 460 # List checkpoints
441 461 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
442 462 self.assertEqual(cps, [cp1])
443 463
444 464 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
445 465 nb = to_notebook_json(nbcontent)
446 466 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
447 467
448 468 # Restore cp1
449 469 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
450 470 self.assertEqual(r.status_code, 204)
451 471 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
452 472 nb = to_notebook_json(nbcontent)
453 473 self.assertEqual(nb.worksheets, [])
454 474
455 475 # Delete cp1
456 476 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
457 477 self.assertEqual(r.status_code, 204)
458 478 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
459 479 self.assertEqual(cps, [])
General Comments 0
You need to be logged in to leave comments. Login now