##// END OF EJS Templates
DEV: Allow CheckpointManagers to optimize for shared backends....
Scott Sanderson -
Show More
@@ -1,772 +1,819 b''
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 from contextlib import contextmanager
8 8 import errno
9 9 import io
10 10 import os
11 11 import shutil
12 12 import mimetypes
13 13
14 14 from tornado import web
15 15
16 16 from .manager import (
17 17 CheckpointManager,
18 18 ContentsManager,
19 19 )
20 20 from IPython import nbformat
21 21 from IPython.utils.io import atomic_writing
22 22 from IPython.utils.importstring import import_item
23 23 from IPython.utils.path import ensure_dir_exists
24 24 from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
25 25 from IPython.utils.py3compat import getcwd, string_types, str_to_unicode
26 26 from IPython.utils import tz
27 27 from IPython.html.utils import (
28 28 is_hidden,
29 29 to_api_path,
30 30 to_os_path,
31 31 )
32 32
33 33 _script_exporter = None
34 34
35 35 def _post_save_script(model, os_path, contents_manager, **kwargs):
36 36 """convert notebooks to Python script after save with nbconvert
37 37
38 38 replaces `ipython notebook --script`
39 39 """
40 40 from IPython.nbconvert.exporters.script import ScriptExporter
41 41
42 42 if model['type'] != 'notebook':
43 43 return
44 44
45 45 global _script_exporter
46 46 if _script_exporter is None:
47 47 _script_exporter = ScriptExporter(parent=contents_manager)
48 48 log = contents_manager.log
49 49
50 50 base, ext = os.path.splitext(os_path)
51 51 py_fname = base + '.py'
52 52 script, resources = _script_exporter.from_filename(os_path)
53 53 script_fname = base + resources.get('output_extension', '.txt')
54 54 log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir))
55 55 with io.open(script_fname, 'w', encoding='utf-8') as f:
56 56 f.write(script)
57 57
58 58
59 59 class FileManagerMixin(object):
60 60 """
61 61 Mixin for ContentsAPI classes that interact with the filesystem.
62 62
63 63 Provides facilities for reading, writing, and copying both notebooks and
64 64 generic files.
65 65
66 66 Shared by FileContentsManager and FileCheckpointManager.
67 67
68 68 Note
69 69 ----
70 70 Classes using this mixin must provide the following attributes:
71 71
72 72 root_dir : unicode
73 73 A directory against against which API-style paths are to be resolved.
74 74
75 75 log : logging.Logger
76 76 """
77 77
78 78 @contextmanager
79 79 def open(self, os_path, *args, **kwargs):
80 80 """wrapper around io.open that turns permission errors into 403"""
81 81 with self.perm_to_403(os_path):
82 82 with io.open(os_path, *args, **kwargs) as f:
83 83 yield f
84 84
85 85 @contextmanager
86 86 def atomic_writing(self, os_path, *args, **kwargs):
87 87 """wrapper around atomic_writing that turns permission errors into 403"""
88 88 with self.perm_to_403(os_path):
89 89 with atomic_writing(os_path, *args, **kwargs) as f:
90 90 yield f
91 91
92 92 @contextmanager
93 93 def perm_to_403(self, os_path=''):
94 94 """context manager for turning permission errors into 403."""
95 95 try:
96 96 yield
97 97 except OSError as e:
98 98 if e.errno in {errno.EPERM, errno.EACCES}:
99 99 # make 403 error message without root prefix
100 100 # this may not work perfectly on unicode paths on Python 2,
101 101 # but nobody should be doing that anyway.
102 102 if not os_path:
103 103 os_path = str_to_unicode(e.filename or 'unknown file')
104 104 path = to_api_path(os_path, root=self.root_dir)
105 105 raise web.HTTPError(403, u'Permission denied: %s' % path)
106 106 else:
107 107 raise
108 108
109 109 def _copy(self, src, dest):
110 110 """copy src to dest
111 111
112 112 like shutil.copy2, but log errors in copystat
113 113 """
114 114 shutil.copyfile(src, dest)
115 115 try:
116 116 shutil.copystat(src, dest)
117 117 except OSError:
118 118 self.log.debug("copystat on %s failed", dest, exc_info=True)
119 119
120 120 def _get_os_path(self, path):
121 121 """Given an API path, return its file system path.
122 122
123 123 Parameters
124 124 ----------
125 125 path : string
126 126 The relative API path to the named file.
127 127
128 128 Returns
129 129 -------
130 130 path : string
131 131 Native, absolute OS path to for a file.
132 132 """
133 133 return to_os_path(path, self.root_dir)
134 134
135 135 def _read_notebook(self, os_path, as_version=4):
136 136 """Read a notebook from an os path."""
137 137 with self.open(os_path, 'r', encoding='utf-8') as f:
138 138 try:
139 139 return nbformat.read(f, as_version=as_version)
140 140 except Exception as e:
141 141 raise web.HTTPError(
142 142 400,
143 143 u"Unreadable Notebook: %s %r" % (os_path, e),
144 144 )
145 145
146 146 def _save_notebook(self, os_path, nb):
147 147 """Save a notebook to an os_path."""
148 148 with self.atomic_writing(os_path, encoding='utf-8') as f:
149 149 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
150 150
151 151 def _read_file(self, os_path, format):
152 152 """Read a non-notebook file.
153 153
154 154 os_path: The path to be read.
155 155 format:
156 156 If 'text', the contents will be decoded as UTF-8.
157 157 If 'base64', the raw bytes contents will be encoded as base64.
158 158 If not specified, try to decode as UTF-8, and fall back to base64
159 159 """
160 160 if not os.path.isfile(os_path):
161 161 raise web.HTTPError(400, "Cannot read non-file %s" % os_path)
162 162
163 163 with self.open(os_path, 'rb') as f:
164 164 bcontent = f.read()
165 165
166 166 if format is None or format == 'text':
167 167 # Try to interpret as unicode if format is unknown or if unicode
168 168 # was explicitly requested.
169 169 try:
170 170 return bcontent.decode('utf8'), 'text'
171 171 except UnicodeError as e:
172 172 if format == 'text':
173 173 raise web.HTTPError(
174 174 400,
175 175 "%s is not UTF-8 encoded" % os_path,
176 176 reason='bad format',
177 177 )
178 178 return base64.encodestring(bcontent).decode('ascii'), 'base64'
179 179
180 180 def _save_file(self, os_path, content, format):
181 181 """Save content of a generic file."""
182 182 if format not in {'text', 'base64'}:
183 183 raise web.HTTPError(
184 184 400,
185 185 "Must specify format of file contents as 'text' or 'base64'",
186 186 )
187 187 try:
188 188 if format == 'text':
189 189 bcontent = content.encode('utf8')
190 190 else:
191 191 b64_bytes = content.encode('ascii')
192 192 bcontent = base64.decodestring(b64_bytes)
193 193 except Exception as e:
194 194 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
195 195
196 196 with self.atomic_writing(os_path, text=False) as f:
197 197 f.write(bcontent)
198 198
199 199
200 200 class FileCheckpointManager(FileManagerMixin, CheckpointManager):
201 201 """
202 202 A CheckpointManager that caches checkpoints for files in adjacent
203 203 directories.
204 204 """
205 205
206 206 checkpoint_dir = Unicode(
207 207 '.ipynb_checkpoints',
208 208 config=True,
209 209 help="""The directory name in which to keep file checkpoints
210 210
211 211 This is a path relative to the file's own directory.
212 212
213 213 By default, it is .ipynb_checkpoints
214 214 """,
215 215 )
216 216
217 217 root_dir = Unicode(config=True)
218 218
219 219 def _root_dir_default(self):
220 220 try:
221 221 return self.parent.root_dir
222 222 except AttributeError:
223 223 return getcwd()
224 224
225 # public checkpoint API
226 def create_file_checkpoint(self, content, format, path):
227 """Create a checkpoint from the current content of a notebook."""
228 path = path.strip('/')
229 # only the one checkpoint ID:
230 checkpoint_id = u"checkpoint"
231 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
232 self.log.debug("creating checkpoint for %s", path)
233 with self.perm_to_403():
234 self._save_file(os_checkpoint_path, content, format=format)
235
236 # return the checkpoint info
237 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
238
239 def create_notebook_checkpoint(self, nb, path):
240 """Create a checkpoint from the current content of a notebook."""
241 path = path.strip('/')
242 # only the one checkpoint ID:
243 checkpoint_id = u"checkpoint"
244 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
245 self.log.debug("creating checkpoint for %s", path)
246 with self.perm_to_403():
247 self._save_notebook(os_checkpoint_path, nb)
225 # ContentsManager-dependent checkpoint API
226 def create_checkpoint(self, contents_mgr, path):
227 """
228 Create a checkpoint.
248 229
249 # return the checkpoint info
250 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
230 If contents_mgr is backed by the local filesystem, just copy the
231 appropriate file to the checkpoint directory. Otherwise, ask the
232 ContentsManager for a model and write it ourselves.
233 """
234 if contents_mgr.backend == 'local_file':
235 # We know that the file is in the local filesystem, so just copy
236 # from the base location to our location.
237 checkpoint_id = u'checkpoint'
238 src_path = contents_mgr._get_os_path(path)
239 dest_path = self.checkpoint_path(checkpoint_id, path)
240 self._copy(src_path, dest_path)
241 return self.checkpoint_model(checkpoint_id, dest_path)
242 else:
243 return super(FileCheckpointManager, self).create_checkpoint(
244 contents_mgr, path,
245 )
251 246
252 def get_checkpoint(self, checkpoint_id, path, type):
253 """Get the content of a checkpoint.
247 def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
248 """
249 Restore a checkpoint.
254 250
255 Returns a model suitable for passing to ContentsManager.save.
251 If contents_mgr is backed by the local filesystem, just copy the
252 appropriate file from the checkpoint directory. Otherwise, load the
253 model and pass it to ContentsManager.save.
256 254 """
257 path = path.strip('/')
258 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
259 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
260 if not os.path.isfile(os_checkpoint_path):
261 self.no_such_checkpoint(path, checkpoint_id)
262 if type == 'notebook':
263 return {
264 'type': type,
265 'content': self._read_notebook(
266 os_checkpoint_path,
267 as_version=4,
268 ),
269 }
255 if contents_mgr.backend == 'local_file':
256 # We know that the file is in the local filesystem, so just copy
257 # from our base location to the location expected by content
258 src_path = self.checkpoint_path(checkpoint_id, path)
259 dest_path = contents_mgr._get_os_path(path)
260 self._copy(src_path, dest_path)
270 261 else:
271 content, format = self._read_file(os_checkpoint_path, format=None)
272 return {
273 'type': type,
274 'content': content,
275 'format': format,
276 }
262 super(FileCheckpointManager, self).restore_checkpoint(
263 contents_mgr, checkpoint_id, path
264 )
277 265
266 # ContentsManager-independent checkpoint API
278 267 def rename_checkpoint(self, checkpoint_id, old_path, new_path):
279 268 """Rename a checkpoint from old_path to new_path."""
280 269 old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
281 270 new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
282 271 if os.path.isfile(old_cp_path):
283 272 self.log.debug(
284 273 "Renaming checkpoint %s -> %s",
285 274 old_cp_path,
286 275 new_cp_path,
287 276 )
288 277 with self.perm_to_403():
289 278 shutil.move(old_cp_path, new_cp_path)
290 279
291 280 def delete_checkpoint(self, checkpoint_id, path):
292 281 """delete a file's checkpoint"""
293 282 path = path.strip('/')
294 283 cp_path = self.checkpoint_path(checkpoint_id, path)
295 284 if not os.path.isfile(cp_path):
296 285 self.no_such_checkpoint(path, checkpoint_id)
297 286
298 287 self.log.debug("unlinking %s", cp_path)
299 288 with self.perm_to_403():
300 289 os.unlink(cp_path)
301 290
302 291 def list_checkpoints(self, path):
303 292 """list the checkpoints for a given file
304 293
305 294 This contents manager currently only supports one checkpoint per file.
306 295 """
307 296 path = path.strip('/')
308 297 checkpoint_id = "checkpoint"
309 298 os_path = self.checkpoint_path(checkpoint_id, path)
310 299 if not os.path.isfile(os_path):
311 300 return []
312 301 else:
313 302 return [self.checkpoint_model(checkpoint_id, os_path)]
314 303
315 304 # Checkpoint-related utilities
316 305 def checkpoint_path(self, checkpoint_id, path):
317 306 """find the path to a checkpoint"""
318 307 path = path.strip('/')
319 308 parent, name = ('/' + path).rsplit('/', 1)
320 309 parent = parent.strip('/')
321 310 basename, ext = os.path.splitext(name)
322 311 filename = u"{name}-{checkpoint_id}{ext}".format(
323 312 name=basename,
324 313 checkpoint_id=checkpoint_id,
325 314 ext=ext,
326 315 )
327 316 os_path = self._get_os_path(path=parent)
328 317 cp_dir = os.path.join(os_path, self.checkpoint_dir)
329 318 with self.perm_to_403():
330 319 ensure_dir_exists(cp_dir)
331 320 cp_path = os.path.join(cp_dir, filename)
332 321 return cp_path
333 322
334 323 def checkpoint_model(self, checkpoint_id, os_path):
335 324 """construct the info dict for a given checkpoint"""
336 325 stats = os.stat(os_path)
337 326 last_modified = tz.utcfromtimestamp(stats.st_mtime)
338 327 info = dict(
339 328 id=checkpoint_id,
340 329 last_modified=last_modified,
341 330 )
342 331 return info
343 332
333 def create_file_checkpoint(self, content, format, path):
334 """Create a checkpoint from the current content of a notebook."""
335 path = path.strip('/')
336 # only the one checkpoint ID:
337 checkpoint_id = u"checkpoint"
338 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
339 self.log.debug("creating checkpoint for %s", path)
340 with self.perm_to_403():
341 self._save_file(os_checkpoint_path, content, format=format)
342
343 # return the checkpoint info
344 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
345
346 def create_notebook_checkpoint(self, nb, path):
347 """Create a checkpoint from the current content of a notebook."""
348 path = path.strip('/')
349 # only the one checkpoint ID:
350 checkpoint_id = u"checkpoint"
351 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
352 self.log.debug("creating checkpoint for %s", path)
353 with self.perm_to_403():
354 self._save_notebook(os_checkpoint_path, nb)
355
356 # return the checkpoint info
357 return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
358
359 def get_checkpoint(self, checkpoint_id, path, type):
360 """Get the content of a checkpoint.
361
362 Returns a model suitable for passing to ContentsManager.save.
363 """
364 path = path.strip('/')
365 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
366 os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
367 if not os.path.isfile(os_checkpoint_path):
368 self.no_such_checkpoint(path, checkpoint_id)
369
370 if type == 'notebook':
371 return {
372 'type': type,
373 'content': self._read_notebook(
374 os_checkpoint_path,
375 as_version=4,
376 ),
377 }
378 elif type == 'file':
379 content, format = self._read_file(os_checkpoint_path, format=None)
380 return {
381 'type': type,
382 'content': content,
383 'format': format,
384 }
385 else:
386 raise web.HTTPError(
387 500,
388 u'Unexpected type %s' % type
389 )
390
344 391 # Error Handling
345 392 def no_such_checkpoint(self, path, checkpoint_id):
346 393 raise web.HTTPError(
347 394 404,
348 395 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
349 396 )
350 397
351 398
352 399 class FileContentsManager(FileManagerMixin, ContentsManager):
353 400
354 401 root_dir = Unicode(config=True)
355 402
356 403 def _root_dir_default(self):
357 404 try:
358 405 return self.parent.notebook_dir
359 406 except AttributeError:
360 407 return getcwd()
361 408
362 409 save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook')
363 410 def _save_script_changed(self):
364 411 self.log.warn("""
365 412 `--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks:
366 413
367 414 ContentsManager.pre_save_hook
368 415 FileContentsManager.post_save_hook
369 416
370 417 A post-save hook has been registered that calls:
371 418
372 419 ipython nbconvert --to script [notebook]
373 420
374 421 which behaves similarly to `--script`.
375 422 """)
376 423
377 424 self.post_save_hook = _post_save_script
378 425
379 426 post_save_hook = Any(None, config=True,
380 427 help="""Python callable or importstring thereof
381 428
382 429 to be called on the path of a file just saved.
383 430
384 431 This can be used to process the file on disk,
385 432 such as converting the notebook to a script or HTML via nbconvert.
386 433
387 434 It will be called as (all arguments passed by keyword):
388 435
389 436 hook(os_path=os_path, model=model, contents_manager=instance)
390 437
391 438 path: the filesystem path to the file just written
392 439 model: the model representing the file
393 440 contents_manager: this ContentsManager instance
394 441 """
395 442 )
396 443 def _post_save_hook_changed(self, name, old, new):
397 444 if new and isinstance(new, string_types):
398 445 self.post_save_hook = import_item(self.post_save_hook)
399 446 elif new:
400 447 if not callable(new):
401 448 raise TraitError("post_save_hook must be callable")
402 449
403 450 def run_post_save_hook(self, model, os_path):
404 451 """Run the post-save hook if defined, and log errors"""
405 452 if self.post_save_hook:
406 453 try:
407 454 self.log.debug("Running post-save hook on %s", os_path)
408 455 self.post_save_hook(os_path=os_path, model=model, contents_manager=self)
409 456 except Exception:
410 457 self.log.error("Post-save hook failed on %s", os_path, exc_info=True)
411 458
412 459 def _root_dir_changed(self, name, old, new):
413 460 """Do a bit of validation of the root_dir."""
414 461 if not os.path.isabs(new):
415 462 # If we receive a non-absolute path, make it absolute.
416 463 self.root_dir = os.path.abspath(new)
417 464 return
418 465 if not os.path.isdir(new):
419 466 raise TraitError("%r is not a directory" % new)
420 467
421 468 def _checkpoint_manager_class_default(self):
422 469 return FileCheckpointManager
423 470
471 def _backend_default(self):
472 return 'local_file'
473
424 474 def is_hidden(self, path):
425 475 """Does the API style path correspond to a hidden directory or file?
426 476
427 477 Parameters
428 478 ----------
429 479 path : string
430 480 The path to check. This is an API path (`/` separated,
431 481 relative to root_dir).
432 482
433 483 Returns
434 484 -------
435 485 hidden : bool
436 486 Whether the path exists and is hidden.
437 487 """
438 488 path = path.strip('/')
439 489 os_path = self._get_os_path(path=path)
440 490 return is_hidden(os_path, self.root_dir)
441 491
442 492 def file_exists(self, path):
443 493 """Returns True if the file exists, else returns False.
444 494
445 495 API-style wrapper for os.path.isfile
446 496
447 497 Parameters
448 498 ----------
449 499 path : string
450 500 The relative path to the file (with '/' as separator)
451 501
452 502 Returns
453 503 -------
454 504 exists : bool
455 505 Whether the file exists.
456 506 """
457 507 path = path.strip('/')
458 508 os_path = self._get_os_path(path)
459 509 return os.path.isfile(os_path)
460 510
461 511 def dir_exists(self, path):
462 512 """Does the API-style path refer to an extant directory?
463 513
464 514 API-style wrapper for os.path.isdir
465 515
466 516 Parameters
467 517 ----------
468 518 path : string
469 519 The path to check. This is an API path (`/` separated,
470 520 relative to root_dir).
471 521
472 522 Returns
473 523 -------
474 524 exists : bool
475 525 Whether the path is indeed a directory.
476 526 """
477 527 path = path.strip('/')
478 528 os_path = self._get_os_path(path=path)
479 529 return os.path.isdir(os_path)
480 530
481 531 def exists(self, path):
482 532 """Returns True if the path exists, else returns False.
483 533
484 534 API-style wrapper for os.path.exists
485 535
486 536 Parameters
487 537 ----------
488 538 path : string
489 539 The API path to the file (with '/' as separator)
490 540
491 541 Returns
492 542 -------
493 543 exists : bool
494 544 Whether the target exists.
495 545 """
496 546 path = path.strip('/')
497 547 os_path = self._get_os_path(path=path)
498 548 return os.path.exists(os_path)
499 549
500 550 def _base_model(self, path):
501 551 """Build the common base of a contents model"""
502 552 os_path = self._get_os_path(path)
503 553 info = os.stat(os_path)
504 554 last_modified = tz.utcfromtimestamp(info.st_mtime)
505 555 created = tz.utcfromtimestamp(info.st_ctime)
506 556 # Create the base model.
507 557 model = {}
508 558 model['name'] = path.rsplit('/', 1)[-1]
509 559 model['path'] = path
510 560 model['last_modified'] = last_modified
511 561 model['created'] = created
512 562 model['content'] = None
513 563 model['format'] = None
514 564 model['mimetype'] = None
515 565 try:
516 566 model['writable'] = os.access(os_path, os.W_OK)
517 567 except OSError:
518 568 self.log.error("Failed to check write permissions on %s", os_path)
519 569 model['writable'] = False
520 570 return model
521 571
522 572 def _dir_model(self, path, content=True):
523 573 """Build a model for a directory
524 574
525 575 if content is requested, will include a listing of the directory
526 576 """
527 577 os_path = self._get_os_path(path)
528 578
529 579 four_o_four = u'directory does not exist: %r' % path
530 580
531 581 if not os.path.isdir(os_path):
532 582 raise web.HTTPError(404, four_o_four)
533 583 elif is_hidden(os_path, self.root_dir):
534 584 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
535 585 os_path
536 586 )
537 587 raise web.HTTPError(404, four_o_four)
538 588
539 589 model = self._base_model(path)
540 590 model['type'] = 'directory'
541 591 if content:
542 592 model['content'] = contents = []
543 593 os_dir = self._get_os_path(path)
544 594 for name in os.listdir(os_dir):
545 595 os_path = os.path.join(os_dir, name)
546 596 # skip over broken symlinks in listing
547 597 if not os.path.exists(os_path):
548 598 self.log.warn("%s doesn't exist", os_path)
549 599 continue
550 600 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
551 601 self.log.debug("%s not a regular file", os_path)
552 602 continue
553 603 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
554 604 contents.append(self.get(
555 605 path='%s/%s' % (path, name),
556 606 content=False)
557 607 )
558 608
559 609 model['format'] = 'json'
560 610
561 611 return model
562 612
563 613 def _file_model(self, path, content=True, format=None):
564 614 """Build a model for a file
565 615
566 616 if content is requested, include the file contents.
567 617
568 618 format:
569 619 If 'text', the contents will be decoded as UTF-8.
570 620 If 'base64', the raw bytes contents will be encoded as base64.
571 621 If not specified, try to decode as UTF-8, and fall back to base64
572 622 """
573 623 model = self._base_model(path)
574 624 model['type'] = 'file'
575 625
576 626 os_path = self._get_os_path(path)
577 627
578 628 if content:
579 629 content, format = self._read_file(os_path, format)
580 630 default_mime = {
581 631 'text': 'text/plain',
582 632 'base64': 'application/octet-stream'
583 633 }[format]
584 634
585 635 model.update(
586 636 content=content,
587 637 format=format,
588 638 mimetype=mimetypes.guess_type(os_path)[0] or default_mime,
589 639 )
590 640
591 641 return model
592 642
593 643 def _notebook_model(self, path, content=True):
594 644 """Build a notebook model
595 645
596 646 if content is requested, the notebook content will be populated
597 647 as a JSON structure (not double-serialized)
598 648 """
599 649 model = self._base_model(path)
600 650 model['type'] = 'notebook'
601 651 if content:
602 652 os_path = self._get_os_path(path)
603 653 nb = self._read_notebook(os_path, as_version=4)
604 654 self.mark_trusted_cells(nb, path)
605 655 model['content'] = nb
606 656 model['format'] = 'json'
607 657 self.validate_notebook_model(model)
608 658 return model
609 659
610 660 def get(self, path, content=True, type=None, format=None):
611 661 """ Takes a path for an entity and returns its model
612 662
613 663 Parameters
614 664 ----------
615 665 path : str
616 666 the API path that describes the relative path for the target
617 667 content : bool
618 668 Whether to include the contents in the reply
619 669 type : str, optional
620 670 The requested type - 'file', 'notebook', or 'directory'.
621 671 Will raise HTTPError 400 if the content doesn't match.
622 672 format : str, optional
623 673 The requested format for file contents. 'text' or 'base64'.
624 674 Ignored if this returns a notebook or directory model.
625 675
626 676 Returns
627 677 -------
628 678 model : dict
629 679 the contents model. If content=True, returns the contents
630 680 of the file or directory as well.
631 681 """
632 682 path = path.strip('/')
633 683
634 684 if not self.exists(path):
635 685 raise web.HTTPError(404, u'No such file or directory: %s' % path)
636 686
637 687 os_path = self._get_os_path(path)
638 688 if os.path.isdir(os_path):
639 689 if type not in (None, 'directory'):
640 690 raise web.HTTPError(400,
641 691 u'%s is a directory, not a %s' % (path, type), reason='bad type')
642 692 model = self._dir_model(path, content=content)
643 693 elif type == 'notebook' or (type is None and path.endswith('.ipynb')):
644 694 model = self._notebook_model(path, content=content)
645 695 else:
646 696 if type == 'directory':
647 697 raise web.HTTPError(400,
648 698 u'%s is not a directory', reason='bad type')
649 699 model = self._file_model(path, content=content, format=format)
650 700 return model
651 701
652 702 def _save_directory(self, os_path, model, path=''):
653 703 """create a directory"""
654 704 if is_hidden(os_path, self.root_dir):
655 705 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
656 706 if not os.path.exists(os_path):
657 707 with self.perm_to_403():
658 708 os.mkdir(os_path)
659 709 elif not os.path.isdir(os_path):
660 710 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
661 711 else:
662 712 self.log.debug("Directory %r already exists", os_path)
663 713
664 714 def save(self, model, path=''):
665 715 """Save the file model and return the model with no content."""
666 716 path = path.strip('/')
667 717
668 718 if 'type' not in model:
669 719 raise web.HTTPError(400, u'No file type provided')
670 720 if 'content' not in model and model['type'] != 'directory':
671 721 raise web.HTTPError(400, u'No file content provided')
672 722
673 723 self.run_pre_save_hook(model=model, path=path)
674 724
675 725 os_path = self._get_os_path(path)
676 726 self.log.debug("Saving %s", os_path)
677 727 try:
678 728 if model['type'] == 'notebook':
679 729 nb = nbformat.from_dict(model['content'])
680 730 self.check_and_sign(nb, path)
681 731 self._save_notebook(os_path, nb)
682 732 # One checkpoint should always exist for notebooks.
683 733 if not self.checkpoint_manager.list_checkpoints(path):
684 self.checkpoint_manager.create_notebook_checkpoint(
685 nb,
686 path,
687 )
734 self.create_checkpoint(path)
688 735 elif model['type'] == 'file':
689 736 # Missing format will be handled internally by _save_file.
690 737 self._save_file(os_path, model['content'], model.get('format'))
691 738 elif model['type'] == 'directory':
692 739 self._save_directory(os_path, model, path)
693 740 else:
694 741 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
695 742 except web.HTTPError:
696 743 raise
697 744 except Exception as e:
698 745 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
699 746 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
700 747
701 748 validation_message = None
702 749 if model['type'] == 'notebook':
703 750 self.validate_notebook_model(model)
704 751 validation_message = model.get('message', None)
705 752
706 753 model = self.get(path, content=False)
707 754 if validation_message:
708 755 model['message'] = validation_message
709 756
710 757 self.run_post_save_hook(model=model, os_path=os_path)
711 758
712 759 return model
713 760
714 761 def delete_file(self, path):
715 762 """Delete file at path."""
716 763 path = path.strip('/')
717 764 os_path = self._get_os_path(path)
718 765 rm = os.unlink
719 766 if os.path.isdir(os_path):
720 767 listing = os.listdir(os_path)
721 768 # Don't delete non-empty directories.
722 769 # A directory containing only leftover checkpoints is
723 770 # considered empty.
724 771 cp_dir = getattr(self.checkpoint_manager, 'checkpoint_dir', None)
725 772 for entry in listing:
726 773 if entry != cp_dir:
727 774 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
728 775 elif not os.path.isfile(os_path):
729 776 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
730 777
731 778 if os.path.isdir(os_path):
732 779 self.log.debug("Removing directory %s", os_path)
733 780 with self.perm_to_403():
734 781 shutil.rmtree(os_path)
735 782 else:
736 783 self.log.debug("Unlinking file %s", os_path)
737 784 with self.perm_to_403():
738 785 rm(os_path)
739 786
740 787 def rename_file(self, old_path, new_path):
741 788 """Rename a file."""
742 789 old_path = old_path.strip('/')
743 790 new_path = new_path.strip('/')
744 791 if new_path == old_path:
745 792 return
746 793
747 794 new_os_path = self._get_os_path(new_path)
748 795 old_os_path = self._get_os_path(old_path)
749 796
750 797 # Should we proceed with the move?
751 798 if os.path.exists(new_os_path):
752 799 raise web.HTTPError(409, u'File already exists: %s' % new_path)
753 800
754 801 # Move the file
755 802 try:
756 803 with self.perm_to_403():
757 804 shutil.move(old_os_path, new_os_path)
758 805 except web.HTTPError:
759 806 raise
760 807 except Exception as e:
761 808 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
762 809
763 810 def info_string(self):
764 811 return "Serving notebooks from local directory: %s" % self.root_dir
765 812
766 813 def get_kernel_path(self, path, model=None):
767 814 """Return the initial API path of a kernel associated with a given notebook"""
768 815 if '/' in path:
769 816 parent_dir = path.rsplit('/', 1)[0]
770 817 else:
771 818 parent_dir = ''
772 819 return parent_dir
@@ -1,536 +1,539 b''
1 1 """A base class for contents managers."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from fnmatch import fnmatch
7 7 import itertools
8 8 import json
9 9 import os
10 10 import re
11 11
12 12 from tornado.web import HTTPError
13 13
14 from IPython import nbformat
15 14 from IPython.config.configurable import LoggingConfigurable
16 15 from IPython.nbformat import sign, validate, ValidationError
17 16 from IPython.nbformat.v4 import new_notebook
18 17 from IPython.utils.importstring import import_item
19 18 from IPython.utils.traitlets import (
20 19 Any,
21 20 Dict,
22 21 Instance,
23 22 List,
24 23 TraitError,
25 24 Type,
26 25 Unicode,
27 26 )
28 27 from IPython.utils.py3compat import string_types
29 28
30 29 copy_pat = re.compile(r'\-Copy\d*\.')
31 30
32 31
33 32 class CheckpointManager(LoggingConfigurable):
34 33 """
35 34 Base class for managing checkpoints for a ContentsManager.
36 35 """
36
37 def create_checkpoint(self, contents_mgr, path):
38 model = contents_mgr.get(path, content=True)
39 type = model['type']
40 if type == 'notebook':
41 return self.create_notebook_checkpoint(
42 model['content'],
43 path,
44 )
45 elif type == 'file':
46 return self.create_file_checkpoint(
47 model['content'],
48 model['format'],
49 path,
50 )
51
52 def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
53 """Restore a checkpoint."""
54 type = contents_mgr.get(path, content=False)['type']
55 model = self.get_checkpoint(checkpoint_id, path, type)
56 contents_mgr.save(model, path)
57
37 58 def create_file_checkpoint(self, content, format, path):
38 59 """Create a checkpoint of the current state of a file
39 60
40 61 Returns a checkpoint model for the new checkpoint.
41 62 """
42 63 raise NotImplementedError("must be implemented in a subclass")
43 64
44 65 def create_notebook_checkpoint(self, nb, path):
45 66 """Create a checkpoint of the current state of a file
46 67
47 68 Returns a checkpoint model for the new checkpoint.
48 69 """
49 70 raise NotImplementedError("must be implemented in a subclass")
50 71
51 72 def get_checkpoint(self, checkpoint_id, path, type):
52 73 """Get the content of a checkpoint.
53 74
54 75 Returns an unvalidated model with the same structure as
55 76 the return value of ContentsManager.get
56 77 """
57 78 raise NotImplementedError("must be implemented in a subclass")
58 79
59 80 def rename_checkpoint(self, checkpoint_id, old_path, new_path):
60 81 """Rename a single checkpoint from old_path to new_path."""
61 82 raise NotImplementedError("must be implemented in a subclass")
62 83
63 84 def delete_checkpoint(self, checkpoint_id, path):
64 85 """delete a checkpoint for a file"""
65 86 raise NotImplementedError("must be implemented in a subclass")
66 87
67 88 def list_checkpoints(self, path):
68 89 """Return a list of checkpoints for a given file"""
69 90 raise NotImplementedError("must be implemented in a subclass")
70 91
71 92 def rename_all_checkpoints(self, old_path, new_path):
72 93 """Rename all checkpoints for old_path to new_path."""
73 94 for cp in self.list_checkpoints(old_path):
74 95 self.rename_checkpoint(cp['id'], old_path, new_path)
75 96
76 97 def delete_all_checkpoints(self, path):
77 98 """Delete all checkpoints for the given path."""
78 99 for checkpoint in self.list_checkpoints(path):
79 100 self.delete_checkpoint(checkpoint['id'], path)
80 101
81 102
82 103 class ContentsManager(LoggingConfigurable):
83 104 """Base class for serving files and directories.
84 105
85 106 This serves any text or binary file,
86 107 as well as directories,
87 108 with special handling for JSON notebook documents.
88 109
89 110 Most APIs take a path argument,
90 111 which is always an API-style unicode path,
91 112 and always refers to a directory.
92 113
93 114 - unicode, not url-escaped
94 115 - '/'-separated
95 116 - leading and trailing '/' will be stripped
96 117 - if unspecified, path defaults to '',
97 118 indicating the root path.
98 119
99 120 """
100 121
101 122 notary = Instance(sign.NotebookNotary)
102 123 def _notary_default(self):
103 124 return sign.NotebookNotary(parent=self)
104 125
105 126 hide_globs = List(Unicode, [
106 127 u'__pycache__', '*.pyc', '*.pyo',
107 128 '.DS_Store', '*.so', '*.dylib', '*~',
108 129 ], config=True, help="""
109 130 Glob patterns to hide in file and directory listings.
110 131 """)
111 132
112 133 untitled_notebook = Unicode("Untitled", config=True,
113 134 help="The base name used when creating untitled notebooks."
114 135 )
115 136
116 137 untitled_file = Unicode("untitled", config=True,
117 138 help="The base name used when creating untitled files."
118 139 )
119 140
120 141 untitled_directory = Unicode("Untitled Folder", config=True,
121 142 help="The base name used when creating untitled directories."
122 143 )
123 144
124 145 pre_save_hook = Any(None, config=True,
125 146 help="""Python callable or importstring thereof
126 147
127 148 To be called on a contents model prior to save.
128 149
129 150 This can be used to process the structure,
130 151 such as removing notebook outputs or other side effects that
131 152 should not be saved.
132 153
133 154 It will be called as (all arguments passed by keyword):
134 155
135 156 hook(path=path, model=model, contents_manager=self)
136 157
137 158 model: the model to be saved. Includes file contents.
138 159 modifying this dict will affect the file that is stored.
139 160 path: the API path of the save destination
140 161 contents_manager: this ContentsManager instance
141 162 """
142 163 )
143 164 def _pre_save_hook_changed(self, name, old, new):
144 165 if new and isinstance(new, string_types):
145 166 self.pre_save_hook = import_item(self.pre_save_hook)
146 167 elif new:
147 168 if not callable(new):
148 169 raise TraitError("pre_save_hook must be callable")
149 170
150 171 def run_pre_save_hook(self, model, path, **kwargs):
151 172 """Run the pre-save hook if defined, and log errors"""
152 173 if self.pre_save_hook:
153 174 try:
154 175 self.log.debug("Running pre-save hook on %s", path)
155 176 self.pre_save_hook(model=model, path=path, contents_manager=self, **kwargs)
156 177 except Exception:
157 178 self.log.error("Pre-save hook failed on %s", path, exc_info=True)
158 179
159 180 checkpoint_manager_class = Type(CheckpointManager, config=True)
160 181 checkpoint_manager = Instance(CheckpointManager, config=True)
161 182 checkpoint_manager_kwargs = Dict(allow_none=False, config=True)
183 backend = Unicode(default_value="")
162 184
163 185 def _checkpoint_manager_default(self):
164 186 return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs)
165 187
166 188 def _checkpoint_manager_kwargs_default(self):
167 189 return dict(
168 190 parent=self,
169 191 log=self.log,
170 192 )
171 193
172 194 # ContentsManager API part 1: methods that must be
173 195 # implemented in subclasses.
174 196
175 197 def dir_exists(self, path):
176 198 """Does the API-style path (directory) actually exist?
177 199
178 200 Like os.path.isdir
179 201
180 202 Override this method in subclasses.
181 203
182 204 Parameters
183 205 ----------
184 206 path : string
185 207 The path to check
186 208
187 209 Returns
188 210 -------
189 211 exists : bool
190 212 Whether the path does indeed exist.
191 213 """
192 214 raise NotImplementedError
193 215
194 216 def is_hidden(self, path):
195 217 """Does the API style path correspond to a hidden directory or file?
196 218
197 219 Parameters
198 220 ----------
199 221 path : string
200 222 The path to check. This is an API path (`/` separated,
201 223 relative to root dir).
202 224
203 225 Returns
204 226 -------
205 227 hidden : bool
206 228 Whether the path is hidden.
207 229
208 230 """
209 231 raise NotImplementedError
210 232
211 233 def file_exists(self, path=''):
212 234 """Does a file exist at the given path?
213 235
214 236 Like os.path.isfile
215 237
216 238 Override this method in subclasses.
217 239
218 240 Parameters
219 241 ----------
220 242 name : string
221 243 The name of the file you are checking.
222 244 path : string
223 245 The relative path to the file's directory (with '/' as separator)
224 246
225 247 Returns
226 248 -------
227 249 exists : bool
228 250 Whether the file exists.
229 251 """
230 252 raise NotImplementedError('must be implemented in a subclass')
231 253
232 254 def exists(self, path):
233 255 """Does a file or directory exist at the given path?
234 256
235 257 Like os.path.exists
236 258
237 259 Parameters
238 260 ----------
239 261 path : string
240 262 The relative path to the file's directory (with '/' as separator)
241 263
242 264 Returns
243 265 -------
244 266 exists : bool
245 267 Whether the target exists.
246 268 """
247 269 return self.file_exists(path) or self.dir_exists(path)
248 270
249 271 def get(self, path, content=True, type=None, format=None):
250 272 """Get the model of a file or directory with or without content."""
251 273 raise NotImplementedError('must be implemented in a subclass')
252 274
253 275 def save(self, model, path):
254 276 """Save the file or directory and return the model with no content.
255 277
256 278 Save implementations should call self.run_pre_save_hook(model=model, path=path)
257 279 prior to writing any data.
258 280 """
259 281 raise NotImplementedError('must be implemented in a subclass')
260 282
261 283 def delete_file(self, path):
262 284 """Delete file or directory by path."""
263 285 raise NotImplementedError('must be implemented in a subclass')
264 286
265 287 def rename_file(self, old_path, new_path):
266 288 """Rename a file."""
267 289 raise NotImplementedError('must be implemented in a subclass')
268 290
269 291 # ContentsManager API part 2: methods that have useable default
270 292 # implementations, but can be overridden in subclasses.
271 293
272 294 def delete(self, path):
273 295 """Delete a file/directory and any associated checkpoints."""
274 296 self.delete_file(path)
275 297 self.checkpoint_manager.delete_all_checkpoints(path)
276 298
277 299 def rename(self, old_path, new_path):
278 300 """Rename a file and any checkpoints associated with that file."""
279 301 self.rename_file(old_path, new_path)
280 302 self.checkpoint_manager.rename_all_checkpoints(old_path, new_path)
281 303
282 304 def update(self, model, path):
283 305 """Update the file's path
284 306
285 307 For use in PATCH requests, to enable renaming a file without
286 308 re-uploading its contents. Only used for renaming at the moment.
287 309 """
288 310 path = path.strip('/')
289 311 new_path = model.get('path', path).strip('/')
290 312 if path != new_path:
291 313 self.rename(path, new_path)
292 314 model = self.get(new_path, content=False)
293 315 return model
294 316
295 317 def info_string(self):
296 318 return "Serving contents"
297 319
298 320 def get_kernel_path(self, path, model=None):
299 321 """Return the API path for the kernel
300 322
301 323 KernelManagers can turn this value into a filesystem path,
302 324 or ignore it altogether.
303 325
304 326 The default value here will start kernels in the directory of the
305 327 notebook server. FileContentsManager overrides this to use the
306 328 directory containing the notebook.
307 329 """
308 330 return ''
309 331
310 332 def increment_filename(self, filename, path='', insert=''):
311 333 """Increment a filename until it is unique.
312 334
313 335 Parameters
314 336 ----------
315 337 filename : unicode
316 338 The name of a file, including extension
317 339 path : unicode
318 340 The API path of the target's directory
319 341
320 342 Returns
321 343 -------
322 344 name : unicode
323 345 A filename that is unique, based on the input filename.
324 346 """
325 347 path = path.strip('/')
326 348 basename, ext = os.path.splitext(filename)
327 349 for i in itertools.count():
328 350 if i:
329 351 insert_i = '{}{}'.format(insert, i)
330 352 else:
331 353 insert_i = ''
332 354 name = u'{basename}{insert}{ext}'.format(basename=basename,
333 355 insert=insert_i, ext=ext)
334 356 if not self.exists(u'{}/{}'.format(path, name)):
335 357 break
336 358 return name
337 359
338 360 def validate_notebook_model(self, model):
339 361 """Add failed-validation message to model"""
340 362 try:
341 363 validate(model['content'])
342 364 except ValidationError as e:
343 365 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
344 366 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
345 367 )
346 368 return model
347 369
348 370 def new_untitled(self, path='', type='', ext=''):
349 371 """Create a new untitled file or directory in path
350 372
351 373 path must be a directory
352 374
353 375 File extension can be specified.
354 376
355 377 Use `new` to create files with a fully specified path (including filename).
356 378 """
357 379 path = path.strip('/')
358 380 if not self.dir_exists(path):
359 381 raise HTTPError(404, 'No such directory: %s' % path)
360 382
361 383 model = {}
362 384 if type:
363 385 model['type'] = type
364 386
365 387 if ext == '.ipynb':
366 388 model.setdefault('type', 'notebook')
367 389 else:
368 390 model.setdefault('type', 'file')
369 391
370 392 insert = ''
371 393 if model['type'] == 'directory':
372 394 untitled = self.untitled_directory
373 395 insert = ' '
374 396 elif model['type'] == 'notebook':
375 397 untitled = self.untitled_notebook
376 398 ext = '.ipynb'
377 399 elif model['type'] == 'file':
378 400 untitled = self.untitled_file
379 401 else:
380 402 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
381 403
382 404 name = self.increment_filename(untitled + ext, path, insert=insert)
383 405 path = u'{0}/{1}'.format(path, name)
384 406 return self.new(model, path)
385 407
386 408 def new(self, model=None, path=''):
387 409 """Create a new file or directory and return its model with no content.
388 410
389 411 To create a new untitled entity in a directory, use `new_untitled`.
390 412 """
391 413 path = path.strip('/')
392 414 if model is None:
393 415 model = {}
394 416
395 417 if path.endswith('.ipynb'):
396 418 model.setdefault('type', 'notebook')
397 419 else:
398 420 model.setdefault('type', 'file')
399 421
400 422 # no content, not a directory, so fill out new-file model
401 423 if 'content' not in model and model['type'] != 'directory':
402 424 if model['type'] == 'notebook':
403 425 model['content'] = new_notebook()
404 426 model['format'] = 'json'
405 427 else:
406 428 model['content'] = ''
407 429 model['type'] = 'file'
408 430 model['format'] = 'text'
409 431
410 432 model = self.save(model, path)
411 433 return model
412 434
413 435 def copy(self, from_path, to_path=None):
414 436 """Copy an existing file and return its new model.
415 437
416 438 If to_path not specified, it will be the parent directory of from_path.
417 439 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
418 440
419 441 from_path must be a full path to a file.
420 442 """
421 443 path = from_path.strip('/')
422 444 if to_path is not None:
423 445 to_path = to_path.strip('/')
424 446
425 447 if '/' in path:
426 448 from_dir, from_name = path.rsplit('/', 1)
427 449 else:
428 450 from_dir = ''
429 451 from_name = path
430 452
431 453 model = self.get(path)
432 454 model.pop('path', None)
433 455 model.pop('name', None)
434 456 if model['type'] == 'directory':
435 457 raise HTTPError(400, "Can't copy directories")
436 458
437 459 if to_path is None:
438 460 to_path = from_dir
439 461 if self.dir_exists(to_path):
440 462 name = copy_pat.sub(u'.', from_name)
441 463 to_name = self.increment_filename(name, to_path, insert='-Copy')
442 464 to_path = u'{0}/{1}'.format(to_path, to_name)
443 465
444 466 model = self.save(model, to_path)
445 467 return model
446 468
447 469 def log_info(self):
448 470 self.log.info(self.info_string())
449 471
450 472 def trust_notebook(self, path):
451 473 """Explicitly trust a notebook
452 474
453 475 Parameters
454 476 ----------
455 477 path : string
456 478 The path of a notebook
457 479 """
458 480 model = self.get(path)
459 481 nb = model['content']
460 482 self.log.warn("Trusting notebook %s", path)
461 483 self.notary.mark_cells(nb, True)
462 484 self.save(model, path)
463 485
464 486 def check_and_sign(self, nb, path=''):
465 487 """Check for trusted cells, and sign the notebook.
466 488
467 489 Called as a part of saving notebooks.
468 490
469 491 Parameters
470 492 ----------
471 493 nb : dict
472 494 The notebook dict
473 495 path : string
474 496 The notebook's path (for logging)
475 497 """
476 498 if self.notary.check_cells(nb):
477 499 self.notary.sign(nb)
478 500 else:
479 501 self.log.warn("Saving untrusted notebook %s", path)
480 502
481 503 def mark_trusted_cells(self, nb, path=''):
482 504 """Mark cells as trusted if the notebook signature matches.
483 505
484 506 Called as a part of loading notebooks.
485 507
486 508 Parameters
487 509 ----------
488 510 nb : dict
489 511 The notebook object (in current nbformat)
490 512 path : string
491 513 The notebook's path (for logging)
492 514 """
493 515 trusted = self.notary.check_signature(nb)
494 516 if not trusted:
495 517 self.log.warn("Notebook %s is not trusted", path)
496 518 self.notary.mark_cells(nb, trusted)
497 519
498 520 def should_list(self, name):
499 521 """Should this file/directory name be displayed in a listing?"""
500 522 return not any(fnmatch(name, glob) for glob in self.hide_globs)
501 523
502 524 # Part 3: Checkpoints API
503 525 def create_checkpoint(self, path):
504 526 """Create a checkpoint."""
505 model = self.get(path, content=True)
506 type = model['type']
507 if type == 'notebook':
508 return self.checkpoint_manager.create_notebook_checkpoint(
509 model['content'],
510 path,
511 )
512 elif type == 'file':
513 return self.checkpoint_manager.create_file_checkpoint(
514 model['content'],
515 model['format'],
516 path,
517 )
518
519 def list_checkpoints(self, path):
520 return self.checkpoint_manager.list_checkpoints(path)
527 return self.checkpoint_manager.create_checkpoint(self, path)
521 528
522 529 def restore_checkpoint(self, checkpoint_id, path):
523 530 """
524 531 Restore a checkpoint.
525 532 """
526 return self.save(
527 model=self.checkpoint_manager.get_checkpoint(
528 checkpoint_id,
529 path,
530 self.get(path, content=False)['type']
531 ),
532 path=path,
533 )
533 self.checkpoint_manager.restore_checkpoint(self, checkpoint_id, path)
534
535 def list_checkpoints(self, path):
536 return self.checkpoint_manager.list_checkpoints(path)
534 537
535 538 def delete_checkpoint(self, checkpoint_id, path):
536 539 return self.checkpoint_manager.delete_checkpoint(checkpoint_id, path)
@@ -1,616 +1,638 b''
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 from contextlib import contextmanager
6 6 import io
7 7 import json
8 8 import os
9 9 import shutil
10 10 from unicodedata import normalize
11 11
12 12 pjoin = os.path.join
13 13
14 14 import requests
15 15
16 16 from IPython.html.utils import url_path_join, url_escape, to_os_path
17 17 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
18 18 from IPython.nbformat import read, write, from_dict
19 19 from IPython.nbformat.v4 import (
20 20 new_notebook, new_markdown_cell,
21 21 )
22 22 from IPython.nbformat import v2
23 23 from IPython.utils import py3compat
24 24 from IPython.utils.data import uniq_stable
25 25 from IPython.utils.tempdir import TemporaryDirectory
26 26
27 27
28 28 def notebooks_only(dir_model):
29 29 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
30 30
31 31 def dirs_only(dir_model):
32 32 return [x for x in dir_model['content'] if x['type']=='directory']
33 33
34 34
35 35 class API(object):
36 36 """Wrapper for contents API calls."""
37 37 def __init__(self, base_url):
38 38 self.base_url = base_url
39 39
40 40 def _req(self, verb, path, body=None, params=None):
41 41 response = requests.request(verb,
42 42 url_path_join(self.base_url, 'api/contents', path),
43 43 data=body, params=params,
44 44 )
45 45 response.raise_for_status()
46 46 return response
47 47
48 48 def list(self, path='/'):
49 49 return self._req('GET', path)
50 50
51 51 def read(self, path, type=None, format=None):
52 52 params = {}
53 53 if type is not None:
54 54 params['type'] = type
55 55 if format is not None:
56 56 params['format'] = format
57 57 return self._req('GET', path, params=params)
58 58
59 59 def create_untitled(self, path='/', ext='.ipynb'):
60 60 body = None
61 61 if ext:
62 62 body = json.dumps({'ext': ext})
63 63 return self._req('POST', path, body)
64 64
65 65 def mkdir_untitled(self, path='/'):
66 66 return self._req('POST', path, json.dumps({'type': 'directory'}))
67 67
68 68 def copy(self, copy_from, path='/'):
69 69 body = json.dumps({'copy_from':copy_from})
70 70 return self._req('POST', path, body)
71 71
72 72 def create(self, path='/'):
73 73 return self._req('PUT', path)
74 74
75 75 def upload(self, path, body):
76 76 return self._req('PUT', path, body)
77 77
78 78 def mkdir(self, path='/'):
79 79 return self._req('PUT', path, json.dumps({'type': 'directory'}))
80 80
81 81 def copy_put(self, copy_from, path='/'):
82 82 body = json.dumps({'copy_from':copy_from})
83 83 return self._req('PUT', path, body)
84 84
85 85 def save(self, path, body):
86 86 return self._req('PUT', path, body)
87 87
88 88 def delete(self, path='/'):
89 89 return self._req('DELETE', path)
90 90
91 91 def rename(self, path, new_path):
92 92 body = json.dumps({'path': new_path})
93 93 return self._req('PATCH', path, body)
94 94
95 95 def get_checkpoints(self, path):
96 96 return self._req('GET', url_path_join(path, 'checkpoints'))
97 97
98 98 def new_checkpoint(self, path):
99 99 return self._req('POST', url_path_join(path, 'checkpoints'))
100 100
101 101 def restore_checkpoint(self, path, checkpoint_id):
102 102 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
103 103
104 104 def delete_checkpoint(self, path, checkpoint_id):
105 105 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
106 106
107 107 class APITest(NotebookTestBase):
108 108 """Test the kernels web service API"""
109 109 dirs_nbs = [('', 'inroot'),
110 110 ('Directory with spaces in', 'inspace'),
111 111 (u'unicodΓ©', 'innonascii'),
112 112 ('foo', 'a'),
113 113 ('foo', 'b'),
114 114 ('foo', 'name with spaces'),
115 115 ('foo', u'unicodΓ©'),
116 116 ('foo/bar', 'baz'),
117 117 ('ordering', 'A'),
118 118 ('ordering', 'b'),
119 119 ('ordering', 'C'),
120 120 (u'Γ₯ b', u'Γ§ d'),
121 121 ]
122 122 hidden_dirs = ['.hidden', '__pycache__']
123 123
124 124 # Don't include root dir.
125 125 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs[1:]])
126 126 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
127 127
128 128 @staticmethod
129 129 def _blob_for_name(name):
130 130 return name.encode('utf-8') + b'\xFF'
131 131
132 132 @staticmethod
133 133 def _txt_for_name(name):
134 134 return u'%s text file' % name
135 135
136 136 def to_os_path(self, api_path):
137 137 return to_os_path(api_path, root=self.notebook_dir.name)
138 138
139 139 def make_dir(self, api_path):
140 140 """Create a directory at api_path"""
141 141 os_path = self.to_os_path(api_path)
142 142 try:
143 143 os.makedirs(os_path)
144 144 except OSError:
145 145 print("Directory already exists: %r" % os_path)
146 146
147 147 def make_txt(self, api_path, txt):
148 148 """Make a text file at a given api_path"""
149 149 os_path = self.to_os_path(api_path)
150 150 with io.open(os_path, 'w', encoding='utf-8') as f:
151 151 f.write(txt)
152 152
153 153 def make_blob(self, api_path, blob):
154 154 """Make a binary file at a given api_path"""
155 155 os_path = self.to_os_path(api_path)
156 156 with io.open(os_path, 'wb') as f:
157 157 f.write(blob)
158 158
159 159 def make_nb(self, api_path, nb):
160 160 """Make a notebook file at a given api_path"""
161 161 os_path = self.to_os_path(api_path)
162 162
163 163 with io.open(os_path, 'w', encoding='utf-8') as f:
164 164 write(nb, f, version=4)
165 165
166 166 def delete_dir(self, api_path):
167 167 """Delete a directory at api_path, removing any contents."""
168 168 os_path = self.to_os_path(api_path)
169 169 shutil.rmtree(os_path, ignore_errors=True)
170 170
171 171 def delete_file(self, api_path):
172 172 """Delete a file at the given path if it exists."""
173 173 if self.isfile(api_path):
174 174 os.unlink(self.to_os_path(api_path))
175 175
176 176 def isfile(self, api_path):
177 177 return os.path.isfile(self.to_os_path(api_path))
178 178
179 179 def isdir(self, api_path):
180 180 return os.path.isdir(self.to_os_path(api_path))
181 181
182 182 def setUp(self):
183 183
184 184 for d in (self.dirs + self.hidden_dirs):
185 185 self.make_dir(d)
186 186
187 187 for d, name in self.dirs_nbs:
188 188 # create a notebook
189 189 nb = new_notebook()
190 190 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
191 191
192 192 # create a text file
193 193 txt = self._txt_for_name(name)
194 194 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
195 195
196 196 # create a binary file
197 197 blob = self._blob_for_name(name)
198 198 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
199 199
200 200 self.api = API(self.base_url())
201 201
202 202 def tearDown(self):
203 203 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
204 204 self.delete_dir(dname)
205 205 self.delete_file('inroot.ipynb')
206 206
207 207 def test_list_notebooks(self):
208 208 nbs = notebooks_only(self.api.list().json())
209 209 self.assertEqual(len(nbs), 1)
210 210 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
211 211
212 212 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
213 213 self.assertEqual(len(nbs), 1)
214 214 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
215 215
216 216 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
217 217 self.assertEqual(len(nbs), 1)
218 218 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
219 219 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
220 220
221 221 nbs = notebooks_only(self.api.list('/foo/bar/').json())
222 222 self.assertEqual(len(nbs), 1)
223 223 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
224 224 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
225 225
226 226 nbs = notebooks_only(self.api.list('foo').json())
227 227 self.assertEqual(len(nbs), 4)
228 228 nbnames = { normalize('NFC', n['name']) for n in nbs }
229 229 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
230 230 expected = { normalize('NFC', name) for name in expected }
231 231 self.assertEqual(nbnames, expected)
232 232
233 233 nbs = notebooks_only(self.api.list('ordering').json())
234 234 nbnames = [n['name'] for n in nbs]
235 235 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
236 236 self.assertEqual(nbnames, expected)
237 237
238 238 def test_list_dirs(self):
239 239 dirs = dirs_only(self.api.list().json())
240 240 dir_names = {normalize('NFC', d['name']) for d in dirs}
241 241 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
242 242
243 243 def test_list_nonexistant_dir(self):
244 244 with assert_http_error(404):
245 245 self.api.list('nonexistant')
246 246
247 247 def test_get_nb_contents(self):
248 248 for d, name in self.dirs_nbs:
249 249 path = url_path_join(d, name + '.ipynb')
250 250 nb = self.api.read(path).json()
251 251 self.assertEqual(nb['name'], u'%s.ipynb' % name)
252 252 self.assertEqual(nb['path'], path)
253 253 self.assertEqual(nb['type'], 'notebook')
254 254 self.assertIn('content', nb)
255 255 self.assertEqual(nb['format'], 'json')
256 256 self.assertIn('content', nb)
257 257 self.assertIn('metadata', nb['content'])
258 258 self.assertIsInstance(nb['content']['metadata'], dict)
259 259
260 260 def test_get_contents_no_such_file(self):
261 261 # Name that doesn't exist - should be a 404
262 262 with assert_http_error(404):
263 263 self.api.read('foo/q.ipynb')
264 264
265 265 def test_get_text_file_contents(self):
266 266 for d, name in self.dirs_nbs:
267 267 path = url_path_join(d, name + '.txt')
268 268 model = self.api.read(path).json()
269 269 self.assertEqual(model['name'], u'%s.txt' % name)
270 270 self.assertEqual(model['path'], path)
271 271 self.assertIn('content', model)
272 272 self.assertEqual(model['format'], 'text')
273 273 self.assertEqual(model['type'], 'file')
274 274 self.assertEqual(model['content'], self._txt_for_name(name))
275 275
276 276 # Name that doesn't exist - should be a 404
277 277 with assert_http_error(404):
278 278 self.api.read('foo/q.txt')
279 279
280 280 # Specifying format=text should fail on a non-UTF-8 file
281 281 with assert_http_error(400):
282 282 self.api.read('foo/bar/baz.blob', type='file', format='text')
283 283
284 284 def test_get_binary_file_contents(self):
285 285 for d, name in self.dirs_nbs:
286 286 path = url_path_join(d, name + '.blob')
287 287 model = self.api.read(path).json()
288 288 self.assertEqual(model['name'], u'%s.blob' % name)
289 289 self.assertEqual(model['path'], path)
290 290 self.assertIn('content', model)
291 291 self.assertEqual(model['format'], 'base64')
292 292 self.assertEqual(model['type'], 'file')
293 293 self.assertEqual(
294 294 base64.decodestring(model['content'].encode('ascii')),
295 295 self._blob_for_name(name),
296 296 )
297 297
298 298 # Name that doesn't exist - should be a 404
299 299 with assert_http_error(404):
300 300 self.api.read('foo/q.txt')
301 301
302 302 def test_get_bad_type(self):
303 303 with assert_http_error(400):
304 304 self.api.read(u'unicodΓ©', type='file') # this is a directory
305 305
306 306 with assert_http_error(400):
307 307 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
308 308
309 309 def _check_created(self, resp, path, type='notebook'):
310 310 self.assertEqual(resp.status_code, 201)
311 311 location_header = py3compat.str_to_unicode(resp.headers['Location'])
312 312 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
313 313 rjson = resp.json()
314 314 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
315 315 self.assertEqual(rjson['path'], path)
316 316 self.assertEqual(rjson['type'], type)
317 317 isright = self.isdir if type == 'directory' else self.isfile
318 318 assert isright(path)
319 319
320 320 def test_create_untitled(self):
321 321 resp = self.api.create_untitled(path=u'Γ₯ b')
322 322 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
323 323
324 324 # Second time
325 325 resp = self.api.create_untitled(path=u'Γ₯ b')
326 326 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
327 327
328 328 # And two directories down
329 329 resp = self.api.create_untitled(path='foo/bar')
330 330 self._check_created(resp, 'foo/bar/Untitled.ipynb')
331 331
332 332 def test_create_untitled_txt(self):
333 333 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
334 334 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
335 335
336 336 resp = self.api.read(path='foo/bar/untitled.txt')
337 337 model = resp.json()
338 338 self.assertEqual(model['type'], 'file')
339 339 self.assertEqual(model['format'], 'text')
340 340 self.assertEqual(model['content'], '')
341 341
342 342 def test_upload(self):
343 343 nb = new_notebook()
344 344 nbmodel = {'content': nb, 'type': 'notebook'}
345 345 path = u'Γ₯ b/Upload tΓ©st.ipynb'
346 346 resp = self.api.upload(path, body=json.dumps(nbmodel))
347 347 self._check_created(resp, path)
348 348
349 349 def test_mkdir_untitled(self):
350 350 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
351 351 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
352 352
353 353 # Second time
354 354 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
355 355 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
356 356
357 357 # And two directories down
358 358 resp = self.api.mkdir_untitled(path='foo/bar')
359 359 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
360 360
361 361 def test_mkdir(self):
362 362 path = u'Γ₯ b/New βˆ‚ir'
363 363 resp = self.api.mkdir(path)
364 364 self._check_created(resp, path, type='directory')
365 365
366 366 def test_mkdir_hidden_400(self):
367 367 with assert_http_error(400):
368 368 resp = self.api.mkdir(u'Γ₯ b/.hidden')
369 369
370 370 def test_upload_txt(self):
371 371 body = u'ΓΌnicode tΓ©xt'
372 372 model = {
373 373 'content' : body,
374 374 'format' : 'text',
375 375 'type' : 'file',
376 376 }
377 377 path = u'Γ₯ b/Upload tΓ©st.txt'
378 378 resp = self.api.upload(path, body=json.dumps(model))
379 379
380 380 # check roundtrip
381 381 resp = self.api.read(path)
382 382 model = resp.json()
383 383 self.assertEqual(model['type'], 'file')
384 384 self.assertEqual(model['format'], 'text')
385 385 self.assertEqual(model['content'], body)
386 386
387 387 def test_upload_b64(self):
388 388 body = b'\xFFblob'
389 389 b64body = base64.encodestring(body).decode('ascii')
390 390 model = {
391 391 'content' : b64body,
392 392 'format' : 'base64',
393 393 'type' : 'file',
394 394 }
395 395 path = u'Γ₯ b/Upload tΓ©st.blob'
396 396 resp = self.api.upload(path, body=json.dumps(model))
397 397
398 398 # check roundtrip
399 399 resp = self.api.read(path)
400 400 model = resp.json()
401 401 self.assertEqual(model['type'], 'file')
402 402 self.assertEqual(model['path'], path)
403 403 self.assertEqual(model['format'], 'base64')
404 404 decoded = base64.decodestring(model['content'].encode('ascii'))
405 405 self.assertEqual(decoded, body)
406 406
407 407 def test_upload_v2(self):
408 408 nb = v2.new_notebook()
409 409 ws = v2.new_worksheet()
410 410 nb.worksheets.append(ws)
411 411 ws.cells.append(v2.new_code_cell(input='print("hi")'))
412 412 nbmodel = {'content': nb, 'type': 'notebook'}
413 413 path = u'Γ₯ b/Upload tΓ©st.ipynb'
414 414 resp = self.api.upload(path, body=json.dumps(nbmodel))
415 415 self._check_created(resp, path)
416 416 resp = self.api.read(path)
417 417 data = resp.json()
418 418 self.assertEqual(data['content']['nbformat'], 4)
419 419
420 420 def test_copy(self):
421 421 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
422 422 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
423 423
424 424 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
425 425 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
426 426
427 427 def test_copy_copy(self):
428 428 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
429 429 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
430 430
431 431 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
432 432 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
433 433
434 434 def test_copy_path(self):
435 435 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
436 436 self._check_created(resp, u'Γ₯ b/a.ipynb')
437 437
438 438 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
439 439 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
440 440
441 441 def test_copy_put_400(self):
442 442 with assert_http_error(400):
443 443 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
444 444
445 445 def test_copy_dir_400(self):
446 446 # can't copy directories
447 447 with assert_http_error(400):
448 448 resp = self.api.copy(u'Γ₯ b', u'foo')
449 449
450 450 def test_delete(self):
451 451 for d, name in self.dirs_nbs:
452 452 print('%r, %r' % (d, name))
453 453 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
454 454 self.assertEqual(resp.status_code, 204)
455 455
456 456 for d in self.dirs + ['/']:
457 457 nbs = notebooks_only(self.api.list(d).json())
458 458 print('------')
459 459 print(d)
460 460 print(nbs)
461 461 self.assertEqual(nbs, [])
462 462
463 463 def test_delete_dirs(self):
464 464 # depth-first delete everything, so we don't try to delete empty directories
465 465 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
466 466 listing = self.api.list(name).json()['content']
467 467 for model in listing:
468 468 self.api.delete(model['path'])
469 469 listing = self.api.list('/').json()['content']
470 470 self.assertEqual(listing, [])
471 471
472 472 def test_delete_non_empty_dir(self):
473 473 """delete non-empty dir raises 400"""
474 474 with assert_http_error(400):
475 475 self.api.delete(u'Γ₯ b')
476 476
477 477 def test_rename(self):
478 478 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
479 479 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
480 480 self.assertEqual(resp.json()['name'], 'z.ipynb')
481 481 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
482 482 assert self.isfile('foo/z.ipynb')
483 483
484 484 nbs = notebooks_only(self.api.list('foo').json())
485 485 nbnames = set(n['name'] for n in nbs)
486 486 self.assertIn('z.ipynb', nbnames)
487 487 self.assertNotIn('a.ipynb', nbnames)
488 488
489 489 def test_rename_existing(self):
490 490 with assert_http_error(409):
491 491 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
492 492
493 493 def test_save(self):
494 494 resp = self.api.read('foo/a.ipynb')
495 495 nbcontent = json.loads(resp.text)['content']
496 496 nb = from_dict(nbcontent)
497 497 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
498 498
499 499 nbmodel= {'content': nb, 'type': 'notebook'}
500 500 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
501 501
502 502 nbcontent = self.api.read('foo/a.ipynb').json()['content']
503 503 newnb = from_dict(nbcontent)
504 504 self.assertEqual(newnb.cells[0].source,
505 505 u'Created by test Β³')
506 506
507 507 def test_checkpoints(self):
508 508 resp = self.api.read('foo/a.ipynb')
509 509 r = self.api.new_checkpoint('foo/a.ipynb')
510 510 self.assertEqual(r.status_code, 201)
511 511 cp1 = r.json()
512 512 self.assertEqual(set(cp1), {'id', 'last_modified'})
513 513 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
514 514
515 515 # Modify it
516 516 nbcontent = json.loads(resp.text)['content']
517 517 nb = from_dict(nbcontent)
518 518 hcell = new_markdown_cell('Created by test')
519 519 nb.cells.append(hcell)
520 520 # Save
521 521 nbmodel= {'content': nb, 'type': 'notebook'}
522 522 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
523 523
524 524 # List checkpoints
525 525 cps = self.api.get_checkpoints('foo/a.ipynb').json()
526 526 self.assertEqual(cps, [cp1])
527 527
528 528 nbcontent = self.api.read('foo/a.ipynb').json()['content']
529 529 nb = from_dict(nbcontent)
530 530 self.assertEqual(nb.cells[0].source, 'Created by test')
531 531
532 532 # Restore cp1
533 533 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
534 534 self.assertEqual(r.status_code, 204)
535 535 nbcontent = self.api.read('foo/a.ipynb').json()['content']
536 536 nb = from_dict(nbcontent)
537 537 self.assertEqual(nb.cells, [])
538 538
539 539 # Delete cp1
540 540 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
541 541 self.assertEqual(r.status_code, 204)
542 542 cps = self.api.get_checkpoints('foo/a.ipynb').json()
543 543 self.assertEqual(cps, [])
544 544
545 545 def test_file_checkpoints(self):
546 546 """
547 547 Test checkpointing of non-notebook files.
548 548 """
549 549 filename = 'foo/a.txt'
550 550 resp = self.api.read(filename)
551 551 orig_content = json.loads(resp.text)['content']
552 552
553 553 # Create a checkpoint.
554 554 r = self.api.new_checkpoint(filename)
555 555 self.assertEqual(r.status_code, 201)
556 556 cp1 = r.json()
557 557 self.assertEqual(set(cp1), {'id', 'last_modified'})
558 558 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
559 559
560 560 # Modify the file and save.
561 561 new_content = orig_content + '\nsecond line'
562 562 model = {
563 563 'content': new_content,
564 564 'type': 'file',
565 565 'format': 'text',
566 566 }
567 567 resp = self.api.save(filename, body=json.dumps(model))
568 568
569 569 # List checkpoints
570 570 cps = self.api.get_checkpoints(filename).json()
571 571 self.assertEqual(cps, [cp1])
572 572
573 573 content = self.api.read(filename).json()['content']
574 574 self.assertEqual(content, new_content)
575 575
576 576 # Restore cp1
577 577 r = self.api.restore_checkpoint(filename, cp1['id'])
578 578 self.assertEqual(r.status_code, 204)
579 579 restored_content = self.api.read(filename).json()['content']
580 580 self.assertEqual(restored_content, orig_content)
581 581
582 582 # Delete cp1
583 583 r = self.api.delete_checkpoint(filename, cp1['id'])
584 584 self.assertEqual(r.status_code, 204)
585 585 cps = self.api.get_checkpoints(filename).json()
586 586 self.assertEqual(cps, [])
587 587
588 588 @contextmanager
589 589 def patch_cp_root(self, dirname):
590 590 """
591 591 Temporarily patch the root dir of our checkpoint manager.
592 592 """
593 593 cpm = self.notebook.contents_manager.checkpoint_manager
594 594 old_dirname = cpm.root_dir
595 595 cpm.root_dir = dirname
596 596 try:
597 597 yield
598 598 finally:
599 599 cpm.root_dir = old_dirname
600 600
601 601 def test_checkpoints_separate_root(self):
602 602 """
603 603 Test that FileCheckpointManager functions correctly even when it's
604 604 using a different root dir from FileContentsManager. This also keeps
605 605 the implementation honest for use with ContentsManagers that don't map
606 606 models to the filesystem
607 607
608 608 Override this method to a no-op when testing other managers.
609 609 """
610 610 with TemporaryDirectory() as td:
611 611 with self.patch_cp_root(td):
612 612 self.test_checkpoints()
613 613
614 614 with TemporaryDirectory() as td:
615 615 with self.patch_cp_root(td):
616 616 self.test_file_checkpoints()
617
618 @contextmanager
619 def patch_cm_backend(self):
620 """
621 Temporarily patch our ContentsManager to present a different backend.
622 """
623 mgr = self.notebook.contents_manager
624 old_backend = mgr.backend
625 mgr.backend = ""
626 try:
627 yield
628 finally:
629 mgr.backend = old_backend
630
631 def test_checkpoints_empty_backend(self):
632 with self.patch_cm_backend():
633 self.test_checkpoints()
634
635 with self.patch_cm_backend():
636 self.test_file_checkpoints()
637
638
General Comments 0
You need to be logged in to leave comments. Login now