##// END OF EJS Templates
docstring
Min RK -
Show More
@@ -1,693 +1,692 b''
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 import errno
8 8 import io
9 9 import os
10 10 import shutil
11 11 from contextlib import contextmanager
12 12 import mimetypes
13 13
14 14 from tornado import web
15 15
16 16 from .manager import ContentsManager
17 17 from IPython import nbformat
18 18 from IPython.utils.io import atomic_writing
19 19 from IPython.utils.importstring import import_item
20 20 from IPython.utils.path import ensure_dir_exists
21 21 from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
22 22 from IPython.utils.py3compat import getcwd, str_to_unicode, string_types
23 23 from IPython.utils import tz
24 24 from IPython.html.utils import is_hidden, to_os_path, to_api_path
25 25
26 26 _script_exporter = None
27 27
28 28 def _post_save_script(model, os_path, contents_manager, **kwargs):
29 29 """convert notebooks to Python script after save with nbconvert
30 30
31 31 replaces `ipython notebook --script`
32 32 """
33 33 from IPython.nbconvert.exporters.script import ScriptExporter
34 34
35 35 if model['type'] != 'notebook':
36 36 return
37 37
38 38 global _script_exporter
39 39 if _script_exporter is None:
40 40 _script_exporter = ScriptExporter(parent=contents_manager)
41 41 log = contents_manager.log
42 42
43 43 base, ext = os.path.splitext(os_path)
44 44 py_fname = base + '.py'
45 45 script, resources = _script_exporter.from_filename(os_path)
46 46 script_fname = base + resources.get('output_extension', '.txt')
47 47 log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir))
48 48 with io.open(script_fname, 'w', encoding='utf-8') as f:
49 49 f.write(script)
50 50
51 51 class FileContentsManager(ContentsManager):
52 52
53 53 root_dir = Unicode(config=True)
54 54
55 55 def _root_dir_default(self):
56 56 try:
57 57 return self.parent.notebook_dir
58 58 except AttributeError:
59 59 return getcwd()
60 60
61 61 @contextmanager
62 62 def perm_to_403(self, os_path=''):
63 63 """context manager for turning permission errors into 403"""
64 64 try:
65 65 yield
66 66 except OSError as e:
67 67 if e.errno in {errno.EPERM, errno.EACCES}:
68 68 # make 403 error message without root prefix
69 69 # this may not work perfectly on unicode paths on Python 2,
70 70 # but nobody should be doing that anyway.
71 71 if not os_path:
72 72 os_path = str_to_unicode(e.filename or 'unknown file')
73 73 path = to_api_path(os_path, self.root_dir)
74 74 raise web.HTTPError(403, u'Permission denied: %s' % path)
75 75 else:
76 76 raise
77 77
78 78 @contextmanager
79 79 def open(self, os_path, *args, **kwargs):
80 80 """wrapper around io.open that turns permission errors into 403"""
81 81 with self.perm_to_403(os_path):
82 82 with io.open(os_path, *args, **kwargs) as f:
83 83 yield f
84 84
85 85 @contextmanager
86 86 def atomic_writing(self, os_path, *args, **kwargs):
87 87 """wrapper around atomic_writing that turns permission errors into 403"""
88 88 with self.perm_to_403(os_path):
89 89 with atomic_writing(os_path, *args, **kwargs) as f:
90 90 yield f
91 91
92 92 save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook')
93 93 def _save_script_changed(self):
94 94 self.log.warn("""
95 95 `--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks:
96 96
97 97 ContentsManager.pre_save_hook
98 98 FileContentsManager.post_save_hook
99 99
100 100 A post-save hook has been registered that calls:
101 101
102 102 ipython nbconvert --to script [notebook]
103 103
104 104 which behaves similarly to `--script`.
105 105 """)
106 106
107 107 self.post_save_hook = _post_save_script
108 108
109 109 post_save_hook = Any(None, config=True,
110 110 help="""Python callable or importstring thereof
111 111
112 112 to be called on the path of a file just saved.
113 113
114 This can be used to
115 114 This can be used to process the file on disk,
116 such as converting the notebook to other formats, such as Python or HTML via nbconvert
115 such as converting the notebook to a script or HTML via nbconvert.
117 116
118 117 It will be called as (all arguments passed by keyword):
119 118
120 119 hook(os_path=os_path, model=model, contents_manager=instance)
121 120
122 121 path: the filesystem path to the file just written
123 122 model: the model representing the file
124 123 contents_manager: this ContentsManager instance
125 124 """
126 125 )
127 126 def _post_save_hook_changed(self, name, old, new):
128 127 if new and isinstance(new, string_types):
129 128 self.post_save_hook = import_item(self.post_save_hook)
130 129 elif new:
131 130 if not callable(new):
132 131 raise TraitError("post_save_hook must be callable")
133 132
134 133 def run_post_save_hook(self, model, os_path):
135 134 """Run the post-save hook if defined, and log errors"""
136 135 if self.post_save_hook:
137 136 try:
138 137 self.log.debug("Running post-save hook on %s", os_path)
139 138 self.post_save_hook(os_path=os_path, model=model, contents_manager=self)
140 139 except Exception:
141 140 self.log.error("Post-save hook failed on %s", os_path, exc_info=True)
142 141
143 142 def _root_dir_changed(self, name, old, new):
144 143 """Do a bit of validation of the root_dir."""
145 144 if not os.path.isabs(new):
146 145 # If we receive a non-absolute path, make it absolute.
147 146 self.root_dir = os.path.abspath(new)
148 147 return
149 148 if not os.path.isdir(new):
150 149 raise TraitError("%r is not a directory" % new)
151 150
152 151 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
153 152 help="""The directory name in which to keep file checkpoints
154 153
155 154 This is a path relative to the file's own directory.
156 155
157 156 By default, it is .ipynb_checkpoints
158 157 """
159 158 )
160 159
161 160 def _copy(self, src, dest):
162 161 """copy src to dest
163 162
164 163 like shutil.copy2, but log errors in copystat
165 164 """
166 165 shutil.copyfile(src, dest)
167 166 try:
168 167 shutil.copystat(src, dest)
169 168 except OSError as e:
170 169 self.log.debug("copystat on %s failed", dest, exc_info=True)
171 170
172 171 def _get_os_path(self, path):
173 172 """Given an API path, return its file system path.
174 173
175 174 Parameters
176 175 ----------
177 176 path : string
178 177 The relative API path to the named file.
179 178
180 179 Returns
181 180 -------
182 181 path : string
183 182 Native, absolute OS path to for a file.
184 183 """
185 184 return to_os_path(path, self.root_dir)
186 185
187 186 def dir_exists(self, path):
188 187 """Does the API-style path refer to an extant directory?
189 188
190 189 API-style wrapper for os.path.isdir
191 190
192 191 Parameters
193 192 ----------
194 193 path : string
195 194 The path to check. This is an API path (`/` separated,
196 195 relative to root_dir).
197 196
198 197 Returns
199 198 -------
200 199 exists : bool
201 200 Whether the path is indeed a directory.
202 201 """
203 202 path = path.strip('/')
204 203 os_path = self._get_os_path(path=path)
205 204 return os.path.isdir(os_path)
206 205
207 206 def is_hidden(self, path):
208 207 """Does the API style path correspond to a hidden directory or file?
209 208
210 209 Parameters
211 210 ----------
212 211 path : string
213 212 The path to check. This is an API path (`/` separated,
214 213 relative to root_dir).
215 214
216 215 Returns
217 216 -------
218 217 hidden : bool
219 218 Whether the path exists and is hidden.
220 219 """
221 220 path = path.strip('/')
222 221 os_path = self._get_os_path(path=path)
223 222 return is_hidden(os_path, self.root_dir)
224 223
225 224 def file_exists(self, path):
226 225 """Returns True if the file exists, else returns False.
227 226
228 227 API-style wrapper for os.path.isfile
229 228
230 229 Parameters
231 230 ----------
232 231 path : string
233 232 The relative path to the file (with '/' as separator)
234 233
235 234 Returns
236 235 -------
237 236 exists : bool
238 237 Whether the file exists.
239 238 """
240 239 path = path.strip('/')
241 240 os_path = self._get_os_path(path)
242 241 return os.path.isfile(os_path)
243 242
244 243 def exists(self, path):
245 244 """Returns True if the path exists, else returns False.
246 245
247 246 API-style wrapper for os.path.exists
248 247
249 248 Parameters
250 249 ----------
251 250 path : string
252 251 The API path to the file (with '/' as separator)
253 252
254 253 Returns
255 254 -------
256 255 exists : bool
257 256 Whether the target exists.
258 257 """
259 258 path = path.strip('/')
260 259 os_path = self._get_os_path(path=path)
261 260 return os.path.exists(os_path)
262 261
263 262 def _base_model(self, path):
264 263 """Build the common base of a contents model"""
265 264 os_path = self._get_os_path(path)
266 265 info = os.stat(os_path)
267 266 last_modified = tz.utcfromtimestamp(info.st_mtime)
268 267 created = tz.utcfromtimestamp(info.st_ctime)
269 268 # Create the base model.
270 269 model = {}
271 270 model['name'] = path.rsplit('/', 1)[-1]
272 271 model['path'] = path
273 272 model['last_modified'] = last_modified
274 273 model['created'] = created
275 274 model['content'] = None
276 275 model['format'] = None
277 276 model['mimetype'] = None
278 277 try:
279 278 model['writable'] = os.access(os_path, os.W_OK)
280 279 except OSError:
281 280 self.log.error("Failed to check write permissions on %s", os_path)
282 281 model['writable'] = False
283 282 return model
284 283
285 284 def _dir_model(self, path, content=True):
286 285 """Build a model for a directory
287 286
288 287 if content is requested, will include a listing of the directory
289 288 """
290 289 os_path = self._get_os_path(path)
291 290
292 291 four_o_four = u'directory does not exist: %r' % path
293 292
294 293 if not os.path.isdir(os_path):
295 294 raise web.HTTPError(404, four_o_four)
296 295 elif is_hidden(os_path, self.root_dir):
297 296 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
298 297 os_path
299 298 )
300 299 raise web.HTTPError(404, four_o_four)
301 300
302 301 model = self._base_model(path)
303 302 model['type'] = 'directory'
304 303 if content:
305 304 model['content'] = contents = []
306 305 os_dir = self._get_os_path(path)
307 306 for name in os.listdir(os_dir):
308 307 os_path = os.path.join(os_dir, name)
309 308 # skip over broken symlinks in listing
310 309 if not os.path.exists(os_path):
311 310 self.log.warn("%s doesn't exist", os_path)
312 311 continue
313 312 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
314 313 self.log.debug("%s not a regular file", os_path)
315 314 continue
316 315 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
317 316 contents.append(self.get(
318 317 path='%s/%s' % (path, name),
319 318 content=False)
320 319 )
321 320
322 321 model['format'] = 'json'
323 322
324 323 return model
325 324
326 325 def _file_model(self, path, content=True, format=None):
327 326 """Build a model for a file
328 327
329 328 if content is requested, include the file contents.
330 329
331 330 format:
332 331 If 'text', the contents will be decoded as UTF-8.
333 332 If 'base64', the raw bytes contents will be encoded as base64.
334 333 If not specified, try to decode as UTF-8, and fall back to base64
335 334 """
336 335 model = self._base_model(path)
337 336 model['type'] = 'file'
338 337
339 338 os_path = self._get_os_path(path)
340 339 model['mimetype'] = mimetypes.guess_type(os_path)[0] or 'text/plain'
341 340
342 341 if content:
343 342 if not os.path.isfile(os_path):
344 343 # could be FIFO
345 344 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
346 345 with self.open(os_path, 'rb') as f:
347 346 bcontent = f.read()
348 347
349 348 if format != 'base64':
350 349 try:
351 350 model['content'] = bcontent.decode('utf8')
352 351 except UnicodeError as e:
353 352 if format == 'text':
354 353 raise web.HTTPError(400, "%s is not UTF-8 encoded" % path)
355 354 else:
356 355 model['format'] = 'text'
357 356
358 357 if model['content'] is None:
359 358 model['content'] = base64.encodestring(bcontent).decode('ascii')
360 359 model['format'] = 'base64'
361 360
362 361 return model
363 362
364 363
365 364 def _notebook_model(self, path, content=True):
366 365 """Build a notebook model
367 366
368 367 if content is requested, the notebook content will be populated
369 368 as a JSON structure (not double-serialized)
370 369 """
371 370 model = self._base_model(path)
372 371 model['type'] = 'notebook'
373 372 if content:
374 373 os_path = self._get_os_path(path)
375 374 with self.open(os_path, 'r', encoding='utf-8') as f:
376 375 try:
377 376 nb = nbformat.read(f, as_version=4)
378 377 except Exception as e:
379 378 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
380 379 self.mark_trusted_cells(nb, path)
381 380 model['content'] = nb
382 381 model['format'] = 'json'
383 382 self.validate_notebook_model(model)
384 383 return model
385 384
386 385 def get(self, path, content=True, type_=None, format=None):
387 386 """ Takes a path for an entity and returns its model
388 387
389 388 Parameters
390 389 ----------
391 390 path : str
392 391 the API path that describes the relative path for the target
393 392 content : bool
394 393 Whether to include the contents in the reply
395 394 type_ : str, optional
396 395 The requested type - 'file', 'notebook', or 'directory'.
397 396 Will raise HTTPError 400 if the content doesn't match.
398 397 format : str, optional
399 398 The requested format for file contents. 'text' or 'base64'.
400 399 Ignored if this returns a notebook or directory model.
401 400
402 401 Returns
403 402 -------
404 403 model : dict
405 404 the contents model. If content=True, returns the contents
406 405 of the file or directory as well.
407 406 """
408 407 path = path.strip('/')
409 408
410 409 if not self.exists(path):
411 410 raise web.HTTPError(404, u'No such file or directory: %s' % path)
412 411
413 412 os_path = self._get_os_path(path)
414 413 if os.path.isdir(os_path):
415 414 if type_ not in (None, 'directory'):
416 415 raise web.HTTPError(400,
417 416 u'%s is a directory, not a %s' % (path, type_))
418 417 model = self._dir_model(path, content=content)
419 418 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
420 419 model = self._notebook_model(path, content=content)
421 420 else:
422 421 if type_ == 'directory':
423 422 raise web.HTTPError(400,
424 423 u'%s is not a directory')
425 424 model = self._file_model(path, content=content, format=format)
426 425 return model
427 426
428 427 def _save_notebook(self, os_path, model, path=''):
429 428 """save a notebook file"""
430 429 # Save the notebook file
431 430 nb = nbformat.from_dict(model['content'])
432 431
433 432 self.check_and_sign(nb, path)
434 433
435 434 with self.atomic_writing(os_path, encoding='utf-8') as f:
436 435 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
437 436
438 437 def _save_file(self, os_path, model, path=''):
439 438 """save a non-notebook file"""
440 439 fmt = model.get('format', None)
441 440 if fmt not in {'text', 'base64'}:
442 441 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
443 442 try:
444 443 content = model['content']
445 444 if fmt == 'text':
446 445 bcontent = content.encode('utf8')
447 446 else:
448 447 b64_bytes = content.encode('ascii')
449 448 bcontent = base64.decodestring(b64_bytes)
450 449 except Exception as e:
451 450 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
452 451 with self.atomic_writing(os_path, text=False) as f:
453 452 f.write(bcontent)
454 453
455 454 def _save_directory(self, os_path, model, path=''):
456 455 """create a directory"""
457 456 if is_hidden(os_path, self.root_dir):
458 457 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
459 458 if not os.path.exists(os_path):
460 459 with self.perm_to_403():
461 460 os.mkdir(os_path)
462 461 elif not os.path.isdir(os_path):
463 462 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
464 463 else:
465 464 self.log.debug("Directory %r already exists", os_path)
466 465
467 466 def save(self, model, path=''):
468 467 """Save the file model and return the model with no content."""
469 468 path = path.strip('/')
470 469
471 470 if 'type' not in model:
472 471 raise web.HTTPError(400, u'No file type provided')
473 472 if 'content' not in model and model['type'] != 'directory':
474 473 raise web.HTTPError(400, u'No file content provided')
475 474
476 475 self.run_pre_save_hook(model=model, path=path)
477 476
478 477 # One checkpoint should always exist
479 478 if self.file_exists(path) and not self.list_checkpoints(path):
480 479 self.create_checkpoint(path)
481 480
482 481 os_path = self._get_os_path(path)
483 482 self.log.debug("Saving %s", os_path)
484 483 try:
485 484 if model['type'] == 'notebook':
486 485 self._save_notebook(os_path, model, path)
487 486 elif model['type'] == 'file':
488 487 self._save_file(os_path, model, path)
489 488 elif model['type'] == 'directory':
490 489 self._save_directory(os_path, model, path)
491 490 else:
492 491 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
493 492 except web.HTTPError:
494 493 raise
495 494 except Exception as e:
496 495 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
497 496 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
498 497
499 498 validation_message = None
500 499 if model['type'] == 'notebook':
501 500 self.validate_notebook_model(model)
502 501 validation_message = model.get('message', None)
503 502
504 503 model = self.get(path, content=False)
505 504 if validation_message:
506 505 model['message'] = validation_message
507 506
508 507 self.run_post_save_hook(model=model, os_path=os_path)
509 508
510 509 return model
511 510
512 511 def update(self, model, path):
513 512 """Update the file's path
514 513
515 514 For use in PATCH requests, to enable renaming a file without
516 515 re-uploading its contents. Only used for renaming at the moment.
517 516 """
518 517 path = path.strip('/')
519 518 new_path = model.get('path', path).strip('/')
520 519 if path != new_path:
521 520 self.rename(path, new_path)
522 521 model = self.get(new_path, content=False)
523 522 return model
524 523
525 524 def delete(self, path):
526 525 """Delete file at path."""
527 526 path = path.strip('/')
528 527 os_path = self._get_os_path(path)
529 528 rm = os.unlink
530 529 if os.path.isdir(os_path):
531 530 listing = os.listdir(os_path)
532 531 # don't delete non-empty directories (checkpoints dir doesn't count)
533 532 if listing and listing != [self.checkpoint_dir]:
534 533 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
535 534 elif not os.path.isfile(os_path):
536 535 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
537 536
538 537 # clear checkpoints
539 538 for checkpoint in self.list_checkpoints(path):
540 539 checkpoint_id = checkpoint['id']
541 540 cp_path = self.get_checkpoint_path(checkpoint_id, path)
542 541 if os.path.isfile(cp_path):
543 542 self.log.debug("Unlinking checkpoint %s", cp_path)
544 543 with self.perm_to_403():
545 544 rm(cp_path)
546 545
547 546 if os.path.isdir(os_path):
548 547 self.log.debug("Removing directory %s", os_path)
549 548 with self.perm_to_403():
550 549 shutil.rmtree(os_path)
551 550 else:
552 551 self.log.debug("Unlinking file %s", os_path)
553 552 with self.perm_to_403():
554 553 rm(os_path)
555 554
556 555 def rename(self, old_path, new_path):
557 556 """Rename a file."""
558 557 old_path = old_path.strip('/')
559 558 new_path = new_path.strip('/')
560 559 if new_path == old_path:
561 560 return
562 561
563 562 new_os_path = self._get_os_path(new_path)
564 563 old_os_path = self._get_os_path(old_path)
565 564
566 565 # Should we proceed with the move?
567 566 if os.path.exists(new_os_path):
568 567 raise web.HTTPError(409, u'File already exists: %s' % new_path)
569 568
570 569 # Move the file
571 570 try:
572 571 with self.perm_to_403():
573 572 shutil.move(old_os_path, new_os_path)
574 573 except web.HTTPError:
575 574 raise
576 575 except Exception as e:
577 576 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
578 577
579 578 # Move the checkpoints
580 579 old_checkpoints = self.list_checkpoints(old_path)
581 580 for cp in old_checkpoints:
582 581 checkpoint_id = cp['id']
583 582 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
584 583 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
585 584 if os.path.isfile(old_cp_path):
586 585 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
587 586 with self.perm_to_403():
588 587 shutil.move(old_cp_path, new_cp_path)
589 588
590 589 # Checkpoint-related utilities
591 590
592 591 def get_checkpoint_path(self, checkpoint_id, path):
593 592 """find the path to a checkpoint"""
594 593 path = path.strip('/')
595 594 parent, name = ('/' + path).rsplit('/', 1)
596 595 parent = parent.strip('/')
597 596 basename, ext = os.path.splitext(name)
598 597 filename = u"{name}-{checkpoint_id}{ext}".format(
599 598 name=basename,
600 599 checkpoint_id=checkpoint_id,
601 600 ext=ext,
602 601 )
603 602 os_path = self._get_os_path(path=parent)
604 603 cp_dir = os.path.join(os_path, self.checkpoint_dir)
605 604 with self.perm_to_403():
606 605 ensure_dir_exists(cp_dir)
607 606 cp_path = os.path.join(cp_dir, filename)
608 607 return cp_path
609 608
610 609 def get_checkpoint_model(self, checkpoint_id, path):
611 610 """construct the info dict for a given checkpoint"""
612 611 path = path.strip('/')
613 612 cp_path = self.get_checkpoint_path(checkpoint_id, path)
614 613 stats = os.stat(cp_path)
615 614 last_modified = tz.utcfromtimestamp(stats.st_mtime)
616 615 info = dict(
617 616 id = checkpoint_id,
618 617 last_modified = last_modified,
619 618 )
620 619 return info
621 620
622 621 # public checkpoint API
623 622
624 623 def create_checkpoint(self, path):
625 624 """Create a checkpoint from the current state of a file"""
626 625 path = path.strip('/')
627 626 if not self.file_exists(path):
628 627 raise web.HTTPError(404)
629 628 src_path = self._get_os_path(path)
630 629 # only the one checkpoint ID:
631 630 checkpoint_id = u"checkpoint"
632 631 cp_path = self.get_checkpoint_path(checkpoint_id, path)
633 632 self.log.debug("creating checkpoint for %s", path)
634 633 with self.perm_to_403():
635 634 self._copy(src_path, cp_path)
636 635
637 636 # return the checkpoint info
638 637 return self.get_checkpoint_model(checkpoint_id, path)
639 638
640 639 def list_checkpoints(self, path):
641 640 """list the checkpoints for a given file
642 641
643 642 This contents manager currently only supports one checkpoint per file.
644 643 """
645 644 path = path.strip('/')
646 645 checkpoint_id = "checkpoint"
647 646 os_path = self.get_checkpoint_path(checkpoint_id, path)
648 647 if not os.path.exists(os_path):
649 648 return []
650 649 else:
651 650 return [self.get_checkpoint_model(checkpoint_id, path)]
652 651
653 652
654 653 def restore_checkpoint(self, checkpoint_id, path):
655 654 """restore a file to a checkpointed state"""
656 655 path = path.strip('/')
657 656 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
658 657 nb_path = self._get_os_path(path)
659 658 cp_path = self.get_checkpoint_path(checkpoint_id, path)
660 659 if not os.path.isfile(cp_path):
661 660 self.log.debug("checkpoint file does not exist: %s", cp_path)
662 661 raise web.HTTPError(404,
663 662 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
664 663 )
665 664 # ensure notebook is readable (never restore from an unreadable notebook)
666 665 if cp_path.endswith('.ipynb'):
667 666 with self.open(cp_path, 'r', encoding='utf-8') as f:
668 667 nbformat.read(f, as_version=4)
669 668 self.log.debug("copying %s -> %s", cp_path, nb_path)
670 669 with self.perm_to_403():
671 670 self._copy(cp_path, nb_path)
672 671
673 672 def delete_checkpoint(self, checkpoint_id, path):
674 673 """delete a file's checkpoint"""
675 674 path = path.strip('/')
676 675 cp_path = self.get_checkpoint_path(checkpoint_id, path)
677 676 if not os.path.isfile(cp_path):
678 677 raise web.HTTPError(404,
679 678 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
680 679 )
681 680 self.log.debug("unlinking %s", cp_path)
682 681 os.unlink(cp_path)
683 682
684 683 def info_string(self):
685 684 return "Serving notebooks from local directory: %s" % self.root_dir
686 685
687 686 def get_kernel_path(self, path, model=None):
688 687 """Return the initial API path of a kernel associated with a given notebook"""
689 688 if '/' in path:
690 689 parent_dir = path.rsplit('/', 1)[0]
691 690 else:
692 691 parent_dir = ''
693 692 return parent_dir
General Comments 0
You need to be logged in to leave comments. Login now