##// END OF EJS Templates
BUG: Set default mimetype when base64 is requested
Scott Sanderson -
Show More
@@ -1,695 +1,696 b''
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 import errno
8 8 import io
9 9 import os
10 10 import shutil
11 11 from contextlib import contextmanager
12 12 import mimetypes
13 13
14 14 from tornado import web
15 15
16 16 from .manager import ContentsManager
17 17 from IPython import nbformat
18 18 from IPython.utils.io import atomic_writing
19 19 from IPython.utils.importstring import import_item
20 20 from IPython.utils.path import ensure_dir_exists
21 21 from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
22 22 from IPython.utils.py3compat import getcwd, str_to_unicode, string_types
23 23 from IPython.utils import tz
24 24 from IPython.html.utils import is_hidden, to_os_path, to_api_path
25 25
26 26 _script_exporter = None
27 27
28 28 def _post_save_script(model, os_path, contents_manager, **kwargs):
29 29 """convert notebooks to Python script after save with nbconvert
30 30
31 31 replaces `ipython notebook --script`
32 32 """
33 33 from IPython.nbconvert.exporters.script import ScriptExporter
34 34
35 35 if model['type'] != 'notebook':
36 36 return
37 37
38 38 global _script_exporter
39 39 if _script_exporter is None:
40 40 _script_exporter = ScriptExporter(parent=contents_manager)
41 41 log = contents_manager.log
42 42
43 43 base, ext = os.path.splitext(os_path)
44 44 py_fname = base + '.py'
45 45 script, resources = _script_exporter.from_filename(os_path)
46 46 script_fname = base + resources.get('output_extension', '.txt')
47 47 log.info("Saving script /%s", to_api_path(script_fname, contents_manager.root_dir))
48 48 with io.open(script_fname, 'w', encoding='utf-8') as f:
49 49 f.write(script)
50 50
51 51 class FileContentsManager(ContentsManager):
52 52
53 53 root_dir = Unicode(config=True)
54 54
55 55 def _root_dir_default(self):
56 56 try:
57 57 return self.parent.notebook_dir
58 58 except AttributeError:
59 59 return getcwd()
60 60
61 61 @contextmanager
62 62 def perm_to_403(self, os_path=''):
63 63 """context manager for turning permission errors into 403"""
64 64 try:
65 65 yield
66 66 except OSError as e:
67 67 if e.errno in {errno.EPERM, errno.EACCES}:
68 68 # make 403 error message without root prefix
69 69 # this may not work perfectly on unicode paths on Python 2,
70 70 # but nobody should be doing that anyway.
71 71 if not os_path:
72 72 os_path = str_to_unicode(e.filename or 'unknown file')
73 73 path = to_api_path(os_path, self.root_dir)
74 74 raise web.HTTPError(403, u'Permission denied: %s' % path)
75 75 else:
76 76 raise
77 77
78 78 @contextmanager
79 79 def open(self, os_path, *args, **kwargs):
80 80 """wrapper around io.open that turns permission errors into 403"""
81 81 with self.perm_to_403(os_path):
82 82 with io.open(os_path, *args, **kwargs) as f:
83 83 yield f
84 84
85 85 @contextmanager
86 86 def atomic_writing(self, os_path, *args, **kwargs):
87 87 """wrapper around atomic_writing that turns permission errors into 403"""
88 88 with self.perm_to_403(os_path):
89 89 with atomic_writing(os_path, *args, **kwargs) as f:
90 90 yield f
91 91
92 92 save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook')
93 93 def _save_script_changed(self):
94 94 self.log.warn("""
95 95 `--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks:
96 96
97 97 ContentsManager.pre_save_hook
98 98 FileContentsManager.post_save_hook
99 99
100 100 A post-save hook has been registered that calls:
101 101
102 102 ipython nbconvert --to script [notebook]
103 103
104 104 which behaves similarly to `--script`.
105 105 """)
106 106
107 107 self.post_save_hook = _post_save_script
108 108
109 109 post_save_hook = Any(None, config=True,
110 110 help="""Python callable or importstring thereof
111 111
112 112 to be called on the path of a file just saved.
113 113
114 114 This can be used to process the file on disk,
115 115 such as converting the notebook to a script or HTML via nbconvert.
116 116
117 117 It will be called as (all arguments passed by keyword):
118 118
119 119 hook(os_path=os_path, model=model, contents_manager=instance)
120 120
121 121 path: the filesystem path to the file just written
122 122 model: the model representing the file
123 123 contents_manager: this ContentsManager instance
124 124 """
125 125 )
126 126 def _post_save_hook_changed(self, name, old, new):
127 127 if new and isinstance(new, string_types):
128 128 self.post_save_hook = import_item(self.post_save_hook)
129 129 elif new:
130 130 if not callable(new):
131 131 raise TraitError("post_save_hook must be callable")
132 132
133 133 def run_post_save_hook(self, model, os_path):
134 134 """Run the post-save hook if defined, and log errors"""
135 135 if self.post_save_hook:
136 136 try:
137 137 self.log.debug("Running post-save hook on %s", os_path)
138 138 self.post_save_hook(os_path=os_path, model=model, contents_manager=self)
139 139 except Exception:
140 140 self.log.error("Post-save hook failed on %s", os_path, exc_info=True)
141 141
142 142 def _root_dir_changed(self, name, old, new):
143 143 """Do a bit of validation of the root_dir."""
144 144 if not os.path.isabs(new):
145 145 # If we receive a non-absolute path, make it absolute.
146 146 self.root_dir = os.path.abspath(new)
147 147 return
148 148 if not os.path.isdir(new):
149 149 raise TraitError("%r is not a directory" % new)
150 150
151 151 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
152 152 help="""The directory name in which to keep file checkpoints
153 153
154 154 This is a path relative to the file's own directory.
155 155
156 156 By default, it is .ipynb_checkpoints
157 157 """
158 158 )
159 159
160 160 def _copy(self, src, dest):
161 161 """copy src to dest
162 162
163 163 like shutil.copy2, but log errors in copystat
164 164 """
165 165 shutil.copyfile(src, dest)
166 166 try:
167 167 shutil.copystat(src, dest)
168 168 except OSError as e:
169 169 self.log.debug("copystat on %s failed", dest, exc_info=True)
170 170
171 171 def _get_os_path(self, path):
172 172 """Given an API path, return its file system path.
173 173
174 174 Parameters
175 175 ----------
176 176 path : string
177 177 The relative API path to the named file.
178 178
179 179 Returns
180 180 -------
181 181 path : string
182 182 Native, absolute OS path to for a file.
183 183 """
184 184 return to_os_path(path, self.root_dir)
185 185
186 186 def dir_exists(self, path):
187 187 """Does the API-style path refer to an extant directory?
188 188
189 189 API-style wrapper for os.path.isdir
190 190
191 191 Parameters
192 192 ----------
193 193 path : string
194 194 The path to check. This is an API path (`/` separated,
195 195 relative to root_dir).
196 196
197 197 Returns
198 198 -------
199 199 exists : bool
200 200 Whether the path is indeed a directory.
201 201 """
202 202 path = path.strip('/')
203 203 os_path = self._get_os_path(path=path)
204 204 return os.path.isdir(os_path)
205 205
206 206 def is_hidden(self, path):
207 207 """Does the API style path correspond to a hidden directory or file?
208 208
209 209 Parameters
210 210 ----------
211 211 path : string
212 212 The path to check. This is an API path (`/` separated,
213 213 relative to root_dir).
214 214
215 215 Returns
216 216 -------
217 217 hidden : bool
218 218 Whether the path exists and is hidden.
219 219 """
220 220 path = path.strip('/')
221 221 os_path = self._get_os_path(path=path)
222 222 return is_hidden(os_path, self.root_dir)
223 223
224 224 def file_exists(self, path):
225 225 """Returns True if the file exists, else returns False.
226 226
227 227 API-style wrapper for os.path.isfile
228 228
229 229 Parameters
230 230 ----------
231 231 path : string
232 232 The relative path to the file (with '/' as separator)
233 233
234 234 Returns
235 235 -------
236 236 exists : bool
237 237 Whether the file exists.
238 238 """
239 239 path = path.strip('/')
240 240 os_path = self._get_os_path(path)
241 241 return os.path.isfile(os_path)
242 242
243 243 def exists(self, path):
244 244 """Returns True if the path exists, else returns False.
245 245
246 246 API-style wrapper for os.path.exists
247 247
248 248 Parameters
249 249 ----------
250 250 path : string
251 251 The API path to the file (with '/' as separator)
252 252
253 253 Returns
254 254 -------
255 255 exists : bool
256 256 Whether the target exists.
257 257 """
258 258 path = path.strip('/')
259 259 os_path = self._get_os_path(path=path)
260 260 return os.path.exists(os_path)
261 261
262 262 def _base_model(self, path):
263 263 """Build the common base of a contents model"""
264 264 os_path = self._get_os_path(path)
265 265 info = os.stat(os_path)
266 266 last_modified = tz.utcfromtimestamp(info.st_mtime)
267 267 created = tz.utcfromtimestamp(info.st_ctime)
268 268 # Create the base model.
269 269 model = {}
270 270 model['name'] = path.rsplit('/', 1)[-1]
271 271 model['path'] = path
272 272 model['last_modified'] = last_modified
273 273 model['created'] = created
274 274 model['content'] = None
275 275 model['format'] = None
276 276 model['mimetype'] = None
277 277 try:
278 278 model['writable'] = os.access(os_path, os.W_OK)
279 279 except OSError:
280 280 self.log.error("Failed to check write permissions on %s", os_path)
281 281 model['writable'] = False
282 282 return model
283 283
284 284 def _dir_model(self, path, content=True):
285 285 """Build a model for a directory
286 286
287 287 if content is requested, will include a listing of the directory
288 288 """
289 289 os_path = self._get_os_path(path)
290 290
291 291 four_o_four = u'directory does not exist: %r' % path
292 292
293 293 if not os.path.isdir(os_path):
294 294 raise web.HTTPError(404, four_o_four)
295 295 elif is_hidden(os_path, self.root_dir):
296 296 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
297 297 os_path
298 298 )
299 299 raise web.HTTPError(404, four_o_four)
300 300
301 301 model = self._base_model(path)
302 302 model['type'] = 'directory'
303 303 if content:
304 304 model['content'] = contents = []
305 305 os_dir = self._get_os_path(path)
306 306 for name in os.listdir(os_dir):
307 307 os_path = os.path.join(os_dir, name)
308 308 # skip over broken symlinks in listing
309 309 if not os.path.exists(os_path):
310 310 self.log.warn("%s doesn't exist", os_path)
311 311 continue
312 312 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
313 313 self.log.debug("%s not a regular file", os_path)
314 314 continue
315 315 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
316 316 contents.append(self.get(
317 317 path='%s/%s' % (path, name),
318 318 content=False)
319 319 )
320 320
321 321 model['format'] = 'json'
322 322
323 323 return model
324 324
325 325 def _file_model(self, path, content=True, format=None):
326 326 """Build a model for a file
327 327
328 328 if content is requested, include the file contents.
329 329
330 330 format:
331 331 If 'text', the contents will be decoded as UTF-8.
332 332 If 'base64', the raw bytes contents will be encoded as base64.
333 333 If not specified, try to decode as UTF-8, and fall back to base64
334 334 """
335 335 model = self._base_model(path)
336 336 model['type'] = 'file'
337 337
338 338 os_path = self._get_os_path(path)
339 339
340 340 if content:
341 341 if not os.path.isfile(os_path):
342 342 # could be FIFO
343 343 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
344 344 with self.open(os_path, 'rb') as f:
345 345 bcontent = f.read()
346 346
347 347 if format != 'base64':
348 348 try:
349 349 model['content'] = bcontent.decode('utf8')
350 350 except UnicodeError as e:
351 351 if format == 'text':
352 352 raise web.HTTPError(400, "%s is not UTF-8 encoded" % path, reason='bad format')
353 353 else:
354 354 model['format'] = 'text'
355 355 default_mime = 'text/plain'
356 356
357 357 if model['content'] is None:
358 358 model['content'] = base64.encodestring(bcontent).decode('ascii')
359 359 model['format'] = 'base64'
360 if model['format'] == 'base64':
360 361 default_mime = 'application/octet-stream'
361 362
362 363 model['mimetype'] = mimetypes.guess_type(os_path)[0] or default_mime
363 364
364 365 return model
365 366
366 367
367 368 def _notebook_model(self, path, content=True):
368 369 """Build a notebook model
369 370
370 371 if content is requested, the notebook content will be populated
371 372 as a JSON structure (not double-serialized)
372 373 """
373 374 model = self._base_model(path)
374 375 model['type'] = 'notebook'
375 376 if content:
376 377 os_path = self._get_os_path(path)
377 378 with self.open(os_path, 'r', encoding='utf-8') as f:
378 379 try:
379 380 nb = nbformat.read(f, as_version=4)
380 381 except Exception as e:
381 382 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
382 383 self.mark_trusted_cells(nb, path)
383 384 model['content'] = nb
384 385 model['format'] = 'json'
385 386 self.validate_notebook_model(model)
386 387 return model
387 388
388 389 def get(self, path, content=True, type=None, format=None):
389 390 """ Takes a path for an entity and returns its model
390 391
391 392 Parameters
392 393 ----------
393 394 path : str
394 395 the API path that describes the relative path for the target
395 396 content : bool
396 397 Whether to include the contents in the reply
397 398 type : str, optional
398 399 The requested type - 'file', 'notebook', or 'directory'.
399 400 Will raise HTTPError 400 if the content doesn't match.
400 401 format : str, optional
401 402 The requested format for file contents. 'text' or 'base64'.
402 403 Ignored if this returns a notebook or directory model.
403 404
404 405 Returns
405 406 -------
406 407 model : dict
407 408 the contents model. If content=True, returns the contents
408 409 of the file or directory as well.
409 410 """
410 411 path = path.strip('/')
411 412
412 413 if not self.exists(path):
413 414 raise web.HTTPError(404, u'No such file or directory: %s' % path)
414 415
415 416 os_path = self._get_os_path(path)
416 417 if os.path.isdir(os_path):
417 418 if type not in (None, 'directory'):
418 419 raise web.HTTPError(400,
419 420 u'%s is a directory, not a %s' % (path, type), reason='bad type')
420 421 model = self._dir_model(path, content=content)
421 422 elif type == 'notebook' or (type is None and path.endswith('.ipynb')):
422 423 model = self._notebook_model(path, content=content)
423 424 else:
424 425 if type == 'directory':
425 426 raise web.HTTPError(400,
426 427 u'%s is not a directory', reason='bad type')
427 428 model = self._file_model(path, content=content, format=format)
428 429 return model
429 430
430 431 def _save_notebook(self, os_path, model, path=''):
431 432 """save a notebook file"""
432 433 # Save the notebook file
433 434 nb = nbformat.from_dict(model['content'])
434 435
435 436 self.check_and_sign(nb, path)
436 437
437 438 with self.atomic_writing(os_path, encoding='utf-8') as f:
438 439 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
439 440
440 441 def _save_file(self, os_path, model, path=''):
441 442 """save a non-notebook file"""
442 443 fmt = model.get('format', None)
443 444 if fmt not in {'text', 'base64'}:
444 445 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
445 446 try:
446 447 content = model['content']
447 448 if fmt == 'text':
448 449 bcontent = content.encode('utf8')
449 450 else:
450 451 b64_bytes = content.encode('ascii')
451 452 bcontent = base64.decodestring(b64_bytes)
452 453 except Exception as e:
453 454 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
454 455 with self.atomic_writing(os_path, text=False) as f:
455 456 f.write(bcontent)
456 457
457 458 def _save_directory(self, os_path, model, path=''):
458 459 """create a directory"""
459 460 if is_hidden(os_path, self.root_dir):
460 461 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
461 462 if not os.path.exists(os_path):
462 463 with self.perm_to_403():
463 464 os.mkdir(os_path)
464 465 elif not os.path.isdir(os_path):
465 466 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
466 467 else:
467 468 self.log.debug("Directory %r already exists", os_path)
468 469
469 470 def save(self, model, path=''):
470 471 """Save the file model and return the model with no content."""
471 472 path = path.strip('/')
472 473
473 474 if 'type' not in model:
474 475 raise web.HTTPError(400, u'No file type provided')
475 476 if 'content' not in model and model['type'] != 'directory':
476 477 raise web.HTTPError(400, u'No file content provided')
477 478
478 479 self.run_pre_save_hook(model=model, path=path)
479 480
480 481 # One checkpoint should always exist
481 482 if self.file_exists(path) and not self.list_checkpoints(path):
482 483 self.create_checkpoint(path)
483 484
484 485 os_path = self._get_os_path(path)
485 486 self.log.debug("Saving %s", os_path)
486 487 try:
487 488 if model['type'] == 'notebook':
488 489 self._save_notebook(os_path, model, path)
489 490 elif model['type'] == 'file':
490 491 self._save_file(os_path, model, path)
491 492 elif model['type'] == 'directory':
492 493 self._save_directory(os_path, model, path)
493 494 else:
494 495 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
495 496 except web.HTTPError:
496 497 raise
497 498 except Exception as e:
498 499 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
499 500 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
500 501
501 502 validation_message = None
502 503 if model['type'] == 'notebook':
503 504 self.validate_notebook_model(model)
504 505 validation_message = model.get('message', None)
505 506
506 507 model = self.get(path, content=False)
507 508 if validation_message:
508 509 model['message'] = validation_message
509 510
510 511 self.run_post_save_hook(model=model, os_path=os_path)
511 512
512 513 return model
513 514
514 515 def update(self, model, path):
515 516 """Update the file's path
516 517
517 518 For use in PATCH requests, to enable renaming a file without
518 519 re-uploading its contents. Only used for renaming at the moment.
519 520 """
520 521 path = path.strip('/')
521 522 new_path = model.get('path', path).strip('/')
522 523 if path != new_path:
523 524 self.rename(path, new_path)
524 525 model = self.get(new_path, content=False)
525 526 return model
526 527
527 528 def delete(self, path):
528 529 """Delete file at path."""
529 530 path = path.strip('/')
530 531 os_path = self._get_os_path(path)
531 532 rm = os.unlink
532 533 if os.path.isdir(os_path):
533 534 listing = os.listdir(os_path)
534 535 # don't delete non-empty directories (checkpoints dir doesn't count)
535 536 if listing and listing != [self.checkpoint_dir]:
536 537 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
537 538 elif not os.path.isfile(os_path):
538 539 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
539 540
540 541 # clear checkpoints
541 542 for checkpoint in self.list_checkpoints(path):
542 543 checkpoint_id = checkpoint['id']
543 544 cp_path = self.get_checkpoint_path(checkpoint_id, path)
544 545 if os.path.isfile(cp_path):
545 546 self.log.debug("Unlinking checkpoint %s", cp_path)
546 547 with self.perm_to_403():
547 548 rm(cp_path)
548 549
549 550 if os.path.isdir(os_path):
550 551 self.log.debug("Removing directory %s", os_path)
551 552 with self.perm_to_403():
552 553 shutil.rmtree(os_path)
553 554 else:
554 555 self.log.debug("Unlinking file %s", os_path)
555 556 with self.perm_to_403():
556 557 rm(os_path)
557 558
558 559 def rename(self, old_path, new_path):
559 560 """Rename a file."""
560 561 old_path = old_path.strip('/')
561 562 new_path = new_path.strip('/')
562 563 if new_path == old_path:
563 564 return
564 565
565 566 new_os_path = self._get_os_path(new_path)
566 567 old_os_path = self._get_os_path(old_path)
567 568
568 569 # Should we proceed with the move?
569 570 if os.path.exists(new_os_path):
570 571 raise web.HTTPError(409, u'File already exists: %s' % new_path)
571 572
572 573 # Move the file
573 574 try:
574 575 with self.perm_to_403():
575 576 shutil.move(old_os_path, new_os_path)
576 577 except web.HTTPError:
577 578 raise
578 579 except Exception as e:
579 580 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
580 581
581 582 # Move the checkpoints
582 583 old_checkpoints = self.list_checkpoints(old_path)
583 584 for cp in old_checkpoints:
584 585 checkpoint_id = cp['id']
585 586 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
586 587 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
587 588 if os.path.isfile(old_cp_path):
588 589 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
589 590 with self.perm_to_403():
590 591 shutil.move(old_cp_path, new_cp_path)
591 592
592 593 # Checkpoint-related utilities
593 594
594 595 def get_checkpoint_path(self, checkpoint_id, path):
595 596 """find the path to a checkpoint"""
596 597 path = path.strip('/')
597 598 parent, name = ('/' + path).rsplit('/', 1)
598 599 parent = parent.strip('/')
599 600 basename, ext = os.path.splitext(name)
600 601 filename = u"{name}-{checkpoint_id}{ext}".format(
601 602 name=basename,
602 603 checkpoint_id=checkpoint_id,
603 604 ext=ext,
604 605 )
605 606 os_path = self._get_os_path(path=parent)
606 607 cp_dir = os.path.join(os_path, self.checkpoint_dir)
607 608 with self.perm_to_403():
608 609 ensure_dir_exists(cp_dir)
609 610 cp_path = os.path.join(cp_dir, filename)
610 611 return cp_path
611 612
612 613 def get_checkpoint_model(self, checkpoint_id, path):
613 614 """construct the info dict for a given checkpoint"""
614 615 path = path.strip('/')
615 616 cp_path = self.get_checkpoint_path(checkpoint_id, path)
616 617 stats = os.stat(cp_path)
617 618 last_modified = tz.utcfromtimestamp(stats.st_mtime)
618 619 info = dict(
619 620 id = checkpoint_id,
620 621 last_modified = last_modified,
621 622 )
622 623 return info
623 624
624 625 # public checkpoint API
625 626
626 627 def create_checkpoint(self, path):
627 628 """Create a checkpoint from the current state of a file"""
628 629 path = path.strip('/')
629 630 if not self.file_exists(path):
630 631 raise web.HTTPError(404)
631 632 src_path = self._get_os_path(path)
632 633 # only the one checkpoint ID:
633 634 checkpoint_id = u"checkpoint"
634 635 cp_path = self.get_checkpoint_path(checkpoint_id, path)
635 636 self.log.debug("creating checkpoint for %s", path)
636 637 with self.perm_to_403():
637 638 self._copy(src_path, cp_path)
638 639
639 640 # return the checkpoint info
640 641 return self.get_checkpoint_model(checkpoint_id, path)
641 642
642 643 def list_checkpoints(self, path):
643 644 """list the checkpoints for a given file
644 645
645 646 This contents manager currently only supports one checkpoint per file.
646 647 """
647 648 path = path.strip('/')
648 649 checkpoint_id = "checkpoint"
649 650 os_path = self.get_checkpoint_path(checkpoint_id, path)
650 651 if not os.path.exists(os_path):
651 652 return []
652 653 else:
653 654 return [self.get_checkpoint_model(checkpoint_id, path)]
654 655
655 656
656 657 def restore_checkpoint(self, checkpoint_id, path):
657 658 """restore a file to a checkpointed state"""
658 659 path = path.strip('/')
659 660 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
660 661 nb_path = self._get_os_path(path)
661 662 cp_path = self.get_checkpoint_path(checkpoint_id, path)
662 663 if not os.path.isfile(cp_path):
663 664 self.log.debug("checkpoint file does not exist: %s", cp_path)
664 665 raise web.HTTPError(404,
665 666 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
666 667 )
667 668 # ensure notebook is readable (never restore from an unreadable notebook)
668 669 if cp_path.endswith('.ipynb'):
669 670 with self.open(cp_path, 'r', encoding='utf-8') as f:
670 671 nbformat.read(f, as_version=4)
671 672 self.log.debug("copying %s -> %s", cp_path, nb_path)
672 673 with self.perm_to_403():
673 674 self._copy(cp_path, nb_path)
674 675
675 676 def delete_checkpoint(self, checkpoint_id, path):
676 677 """delete a file's checkpoint"""
677 678 path = path.strip('/')
678 679 cp_path = self.get_checkpoint_path(checkpoint_id, path)
679 680 if not os.path.isfile(cp_path):
680 681 raise web.HTTPError(404,
681 682 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
682 683 )
683 684 self.log.debug("unlinking %s", cp_path)
684 685 os.unlink(cp_path)
685 686
686 687 def info_string(self):
687 688 return "Serving notebooks from local directory: %s" % self.root_dir
688 689
689 690 def get_kernel_path(self, path, model=None):
690 691 """Return the initial API path of a kernel associated with a given notebook"""
691 692 if '/' in path:
692 693 parent_dir = path.rsplit('/', 1)[0]
693 694 else:
694 695 parent_dir = ''
695 696 return parent_dir
General Comments 0
You need to be logged in to leave comments. Login now