##// END OF EJS Templates
ContentsManager type kwarg to match model key...
Min RK -
Show More
@@ -1,622 +1,622 b''
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 import errno
8 8 import io
9 9 import os
10 10 import shutil
11 11 from contextlib import contextmanager
12 12 import mimetypes
13 13
14 14 from tornado import web
15 15
16 16 from .manager import ContentsManager
17 17 from IPython import nbformat
18 18 from IPython.utils.io import atomic_writing
19 19 from IPython.utils.path import ensure_dir_exists
20 20 from IPython.utils.traitlets import Unicode, Bool, TraitError
21 21 from IPython.utils.py3compat import getcwd, str_to_unicode
22 22 from IPython.utils import tz
23 23 from IPython.html.utils import is_hidden, to_os_path, to_api_path
24 24
25 25
26 26 class FileContentsManager(ContentsManager):
27 27
28 28 root_dir = Unicode(config=True)
29 29
30 30 def _root_dir_default(self):
31 31 try:
32 32 return self.parent.notebook_dir
33 33 except AttributeError:
34 34 return getcwd()
35 35
36 36 @contextmanager
37 37 def perm_to_403(self, os_path=''):
38 38 """context manager for turning permission errors into 403"""
39 39 try:
40 40 yield
41 41 except OSError as e:
42 42 if e.errno in {errno.EPERM, errno.EACCES}:
43 43 # make 403 error message without root prefix
44 44 # this may not work perfectly on unicode paths on Python 2,
45 45 # but nobody should be doing that anyway.
46 46 if not os_path:
47 47 os_path = str_to_unicode(e.filename or 'unknown file')
48 48 path = to_api_path(os_path, self.root_dir)
49 49 raise web.HTTPError(403, u'Permission denied: %s' % path)
50 50 else:
51 51 raise
52 52
53 53 @contextmanager
54 54 def open(self, os_path, *args, **kwargs):
55 55 """wrapper around io.open that turns permission errors into 403"""
56 56 with self.perm_to_403(os_path):
57 57 with io.open(os_path, *args, **kwargs) as f:
58 58 yield f
59 59
60 60 @contextmanager
61 61 def atomic_writing(self, os_path, *args, **kwargs):
62 62 """wrapper around atomic_writing that turns permission errors into 403"""
63 63 with self.perm_to_403(os_path):
64 64 with atomic_writing(os_path, *args, **kwargs) as f:
65 65 yield f
66 66
67 67 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
68 68 def _save_script_changed(self):
69 69 self.log.warn("""
70 70 Automatically saving notebooks as scripts has been removed.
71 71 Use `ipython nbconvert --to python [notebook]` instead.
72 72 """)
73 73
74 74 def _root_dir_changed(self, name, old, new):
75 75 """Do a bit of validation of the root_dir."""
76 76 if not os.path.isabs(new):
77 77 # If we receive a non-absolute path, make it absolute.
78 78 self.root_dir = os.path.abspath(new)
79 79 return
80 80 if not os.path.isdir(new):
81 81 raise TraitError("%r is not a directory" % new)
82 82
83 83 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
84 84 help="""The directory name in which to keep file checkpoints
85 85
86 86 This is a path relative to the file's own directory.
87 87
88 88 By default, it is .ipynb_checkpoints
89 89 """
90 90 )
91 91
92 92 def _copy(self, src, dest):
93 93 """copy src to dest
94 94
95 95 like shutil.copy2, but log errors in copystat
96 96 """
97 97 shutil.copyfile(src, dest)
98 98 try:
99 99 shutil.copystat(src, dest)
100 100 except OSError as e:
101 101 self.log.debug("copystat on %s failed", dest, exc_info=True)
102 102
103 103 def _get_os_path(self, path):
104 104 """Given an API path, return its file system path.
105 105
106 106 Parameters
107 107 ----------
108 108 path : string
109 109 The relative API path to the named file.
110 110
111 111 Returns
112 112 -------
113 113 path : string
114 114 Native, absolute OS path to for a file.
115 115 """
116 116 return to_os_path(path, self.root_dir)
117 117
118 118 def dir_exists(self, path):
119 119 """Does the API-style path refer to an extant directory?
120 120
121 121 API-style wrapper for os.path.isdir
122 122
123 123 Parameters
124 124 ----------
125 125 path : string
126 126 The path to check. This is an API path (`/` separated,
127 127 relative to root_dir).
128 128
129 129 Returns
130 130 -------
131 131 exists : bool
132 132 Whether the path is indeed a directory.
133 133 """
134 134 path = path.strip('/')
135 135 os_path = self._get_os_path(path=path)
136 136 return os.path.isdir(os_path)
137 137
138 138 def is_hidden(self, path):
139 139 """Does the API style path correspond to a hidden directory or file?
140 140
141 141 Parameters
142 142 ----------
143 143 path : string
144 144 The path to check. This is an API path (`/` separated,
145 145 relative to root_dir).
146 146
147 147 Returns
148 148 -------
149 149 hidden : bool
150 150 Whether the path exists and is hidden.
151 151 """
152 152 path = path.strip('/')
153 153 os_path = self._get_os_path(path=path)
154 154 return is_hidden(os_path, self.root_dir)
155 155
156 156 def file_exists(self, path):
157 157 """Returns True if the file exists, else returns False.
158 158
159 159 API-style wrapper for os.path.isfile
160 160
161 161 Parameters
162 162 ----------
163 163 path : string
164 164 The relative path to the file (with '/' as separator)
165 165
166 166 Returns
167 167 -------
168 168 exists : bool
169 169 Whether the file exists.
170 170 """
171 171 path = path.strip('/')
172 172 os_path = self._get_os_path(path)
173 173 return os.path.isfile(os_path)
174 174
175 175 def exists(self, path):
176 176 """Returns True if the path exists, else returns False.
177 177
178 178 API-style wrapper for os.path.exists
179 179
180 180 Parameters
181 181 ----------
182 182 path : string
183 183 The API path to the file (with '/' as separator)
184 184
185 185 Returns
186 186 -------
187 187 exists : bool
188 188 Whether the target exists.
189 189 """
190 190 path = path.strip('/')
191 191 os_path = self._get_os_path(path=path)
192 192 return os.path.exists(os_path)
193 193
194 194 def _base_model(self, path):
195 195 """Build the common base of a contents model"""
196 196 os_path = self._get_os_path(path)
197 197 info = os.stat(os_path)
198 198 last_modified = tz.utcfromtimestamp(info.st_mtime)
199 199 created = tz.utcfromtimestamp(info.st_ctime)
200 200 # Create the base model.
201 201 model = {}
202 202 model['name'] = path.rsplit('/', 1)[-1]
203 203 model['path'] = path
204 204 model['last_modified'] = last_modified
205 205 model['created'] = created
206 206 model['content'] = None
207 207 model['format'] = None
208 208 model['mimetype'] = None
209 209 try:
210 210 model['writable'] = os.access(os_path, os.W_OK)
211 211 except OSError:
212 212 self.log.error("Failed to check write permissions on %s", os_path)
213 213 model['writable'] = False
214 214 return model
215 215
216 216 def _dir_model(self, path, content=True):
217 217 """Build a model for a directory
218 218
219 219 if content is requested, will include a listing of the directory
220 220 """
221 221 os_path = self._get_os_path(path)
222 222
223 223 four_o_four = u'directory does not exist: %r' % path
224 224
225 225 if not os.path.isdir(os_path):
226 226 raise web.HTTPError(404, four_o_four)
227 227 elif is_hidden(os_path, self.root_dir):
228 228 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
229 229 os_path
230 230 )
231 231 raise web.HTTPError(404, four_o_four)
232 232
233 233 model = self._base_model(path)
234 234 model['type'] = 'directory'
235 235 if content:
236 236 model['content'] = contents = []
237 237 os_dir = self._get_os_path(path)
238 238 for name in os.listdir(os_dir):
239 239 os_path = os.path.join(os_dir, name)
240 240 # skip over broken symlinks in listing
241 241 if not os.path.exists(os_path):
242 242 self.log.warn("%s doesn't exist", os_path)
243 243 continue
244 244 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
245 245 self.log.debug("%s not a regular file", os_path)
246 246 continue
247 247 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
248 248 contents.append(self.get(
249 249 path='%s/%s' % (path, name),
250 250 content=False)
251 251 )
252 252
253 253 model['format'] = 'json'
254 254
255 255 return model
256 256
257 257 def _file_model(self, path, content=True, format=None):
258 258 """Build a model for a file
259 259
260 260 if content is requested, include the file contents.
261 261
262 262 format:
263 263 If 'text', the contents will be decoded as UTF-8.
264 264 If 'base64', the raw bytes contents will be encoded as base64.
265 265 If not specified, try to decode as UTF-8, and fall back to base64
266 266 """
267 267 model = self._base_model(path)
268 268 model['type'] = 'file'
269 269
270 270 os_path = self._get_os_path(path)
271 271
272 272 if content:
273 273 if not os.path.isfile(os_path):
274 274 # could be FIFO
275 275 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
276 276 with self.open(os_path, 'rb') as f:
277 277 bcontent = f.read()
278 278
279 279 if format != 'base64':
280 280 try:
281 281 model['content'] = bcontent.decode('utf8')
282 282 except UnicodeError as e:
283 283 if format == 'text':
284 284 raise web.HTTPError(400, "%s is not UTF-8 encoded" % path, reason='bad format')
285 285 else:
286 286 model['format'] = 'text'
287 287 default_mime = 'text/plain'
288 288
289 289 if model['content'] is None:
290 290 model['content'] = base64.encodestring(bcontent).decode('ascii')
291 291 model['format'] = 'base64'
292 292 default_mime = 'application/octet-stream'
293 293
294 294 model['mimetype'] = mimetypes.guess_type(os_path)[0] or default_mime
295 295
296 296 return model
297 297
298 298
299 299 def _notebook_model(self, path, content=True):
300 300 """Build a notebook model
301 301
302 302 if content is requested, the notebook content will be populated
303 303 as a JSON structure (not double-serialized)
304 304 """
305 305 model = self._base_model(path)
306 306 model['type'] = 'notebook'
307 307 if content:
308 308 os_path = self._get_os_path(path)
309 309 with self.open(os_path, 'r', encoding='utf-8') as f:
310 310 try:
311 311 nb = nbformat.read(f, as_version=4)
312 312 except Exception as e:
313 313 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
314 314 self.mark_trusted_cells(nb, path)
315 315 model['content'] = nb
316 316 model['format'] = 'json'
317 317 self.validate_notebook_model(model)
318 318 return model
319 319
320 def get(self, path, content=True, type_=None, format=None):
320 def get(self, path, content=True, type=None, format=None):
321 321 """ Takes a path for an entity and returns its model
322 322
323 323 Parameters
324 324 ----------
325 325 path : str
326 326 the API path that describes the relative path for the target
327 327 content : bool
328 328 Whether to include the contents in the reply
329 type_ : str, optional
329 type : str, optional
330 330 The requested type - 'file', 'notebook', or 'directory'.
331 331 Will raise HTTPError 400 if the content doesn't match.
332 332 format : str, optional
333 333 The requested format for file contents. 'text' or 'base64'.
334 334 Ignored if this returns a notebook or directory model.
335 335
336 336 Returns
337 337 -------
338 338 model : dict
339 339 the contents model. If content=True, returns the contents
340 340 of the file or directory as well.
341 341 """
342 342 path = path.strip('/')
343 343
344 344 if not self.exists(path):
345 345 raise web.HTTPError(404, u'No such file or directory: %s' % path)
346 346
347 347 os_path = self._get_os_path(path)
348 348 if os.path.isdir(os_path):
349 if type_ not in (None, 'directory'):
349 if type not in (None, 'directory'):
350 350 raise web.HTTPError(400,
351 u'%s is a directory, not a %s' % (path, type_), reason='bad type')
351 u'%s is a directory, not a %s' % (path, type), reason='bad type')
352 352 model = self._dir_model(path, content=content)
353 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
353 elif type == 'notebook' or (type is None and path.endswith('.ipynb')):
354 354 model = self._notebook_model(path, content=content)
355 355 else:
356 if type_ == 'directory':
356 if type == 'directory':
357 357 raise web.HTTPError(400,
358 358 u'%s is not a directory', reason='bad type')
359 359 model = self._file_model(path, content=content, format=format)
360 360 return model
361 361
362 362 def _save_notebook(self, os_path, model, path=''):
363 363 """save a notebook file"""
364 364 # Save the notebook file
365 365 nb = nbformat.from_dict(model['content'])
366 366
367 367 self.check_and_sign(nb, path)
368 368
369 369 with self.atomic_writing(os_path, encoding='utf-8') as f:
370 370 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
371 371
372 372 def _save_file(self, os_path, model, path=''):
373 373 """save a non-notebook file"""
374 374 fmt = model.get('format', None)
375 375 if fmt not in {'text', 'base64'}:
376 376 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
377 377 try:
378 378 content = model['content']
379 379 if fmt == 'text':
380 380 bcontent = content.encode('utf8')
381 381 else:
382 382 b64_bytes = content.encode('ascii')
383 383 bcontent = base64.decodestring(b64_bytes)
384 384 except Exception as e:
385 385 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
386 386 with self.atomic_writing(os_path, text=False) as f:
387 387 f.write(bcontent)
388 388
389 389 def _save_directory(self, os_path, model, path=''):
390 390 """create a directory"""
391 391 if is_hidden(os_path, self.root_dir):
392 392 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
393 393 if not os.path.exists(os_path):
394 394 with self.perm_to_403():
395 395 os.mkdir(os_path)
396 396 elif not os.path.isdir(os_path):
397 397 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
398 398 else:
399 399 self.log.debug("Directory %r already exists", os_path)
400 400
401 401 def save(self, model, path=''):
402 402 """Save the file model and return the model with no content."""
403 403 path = path.strip('/')
404 404
405 405 if 'type' not in model:
406 406 raise web.HTTPError(400, u'No file type provided')
407 407 if 'content' not in model and model['type'] != 'directory':
408 408 raise web.HTTPError(400, u'No file content provided')
409 409
410 410 # One checkpoint should always exist
411 411 if self.file_exists(path) and not self.list_checkpoints(path):
412 412 self.create_checkpoint(path)
413 413
414 414 os_path = self._get_os_path(path)
415 415 self.log.debug("Saving %s", os_path)
416 416 try:
417 417 if model['type'] == 'notebook':
418 418 self._save_notebook(os_path, model, path)
419 419 elif model['type'] == 'file':
420 420 self._save_file(os_path, model, path)
421 421 elif model['type'] == 'directory':
422 422 self._save_directory(os_path, model, path)
423 423 else:
424 424 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
425 425 except web.HTTPError:
426 426 raise
427 427 except Exception as e:
428 428 self.log.error(u'Error while saving file: %s %s', path, e, exc_info=True)
429 429 raise web.HTTPError(500, u'Unexpected error while saving file: %s %s' % (path, e))
430 430
431 431 validation_message = None
432 432 if model['type'] == 'notebook':
433 433 self.validate_notebook_model(model)
434 434 validation_message = model.get('message', None)
435 435
436 436 model = self.get(path, content=False)
437 437 if validation_message:
438 438 model['message'] = validation_message
439 439 return model
440 440
441 441 def update(self, model, path):
442 442 """Update the file's path
443 443
444 444 For use in PATCH requests, to enable renaming a file without
445 445 re-uploading its contents. Only used for renaming at the moment.
446 446 """
447 447 path = path.strip('/')
448 448 new_path = model.get('path', path).strip('/')
449 449 if path != new_path:
450 450 self.rename(path, new_path)
451 451 model = self.get(new_path, content=False)
452 452 return model
453 453
454 454 def delete(self, path):
455 455 """Delete file at path."""
456 456 path = path.strip('/')
457 457 os_path = self._get_os_path(path)
458 458 rm = os.unlink
459 459 if os.path.isdir(os_path):
460 460 listing = os.listdir(os_path)
461 461 # don't delete non-empty directories (checkpoints dir doesn't count)
462 462 if listing and listing != [self.checkpoint_dir]:
463 463 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
464 464 elif not os.path.isfile(os_path):
465 465 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
466 466
467 467 # clear checkpoints
468 468 for checkpoint in self.list_checkpoints(path):
469 469 checkpoint_id = checkpoint['id']
470 470 cp_path = self.get_checkpoint_path(checkpoint_id, path)
471 471 if os.path.isfile(cp_path):
472 472 self.log.debug("Unlinking checkpoint %s", cp_path)
473 473 with self.perm_to_403():
474 474 rm(cp_path)
475 475
476 476 if os.path.isdir(os_path):
477 477 self.log.debug("Removing directory %s", os_path)
478 478 with self.perm_to_403():
479 479 shutil.rmtree(os_path)
480 480 else:
481 481 self.log.debug("Unlinking file %s", os_path)
482 482 with self.perm_to_403():
483 483 rm(os_path)
484 484
485 485 def rename(self, old_path, new_path):
486 486 """Rename a file."""
487 487 old_path = old_path.strip('/')
488 488 new_path = new_path.strip('/')
489 489 if new_path == old_path:
490 490 return
491 491
492 492 new_os_path = self._get_os_path(new_path)
493 493 old_os_path = self._get_os_path(old_path)
494 494
495 495 # Should we proceed with the move?
496 496 if os.path.exists(new_os_path):
497 497 raise web.HTTPError(409, u'File already exists: %s' % new_path)
498 498
499 499 # Move the file
500 500 try:
501 501 with self.perm_to_403():
502 502 shutil.move(old_os_path, new_os_path)
503 503 except web.HTTPError:
504 504 raise
505 505 except Exception as e:
506 506 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
507 507
508 508 # Move the checkpoints
509 509 old_checkpoints = self.list_checkpoints(old_path)
510 510 for cp in old_checkpoints:
511 511 checkpoint_id = cp['id']
512 512 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
513 513 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
514 514 if os.path.isfile(old_cp_path):
515 515 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
516 516 with self.perm_to_403():
517 517 shutil.move(old_cp_path, new_cp_path)
518 518
519 519 # Checkpoint-related utilities
520 520
521 521 def get_checkpoint_path(self, checkpoint_id, path):
522 522 """find the path to a checkpoint"""
523 523 path = path.strip('/')
524 524 parent, name = ('/' + path).rsplit('/', 1)
525 525 parent = parent.strip('/')
526 526 basename, ext = os.path.splitext(name)
527 527 filename = u"{name}-{checkpoint_id}{ext}".format(
528 528 name=basename,
529 529 checkpoint_id=checkpoint_id,
530 530 ext=ext,
531 531 )
532 532 os_path = self._get_os_path(path=parent)
533 533 cp_dir = os.path.join(os_path, self.checkpoint_dir)
534 534 with self.perm_to_403():
535 535 ensure_dir_exists(cp_dir)
536 536 cp_path = os.path.join(cp_dir, filename)
537 537 return cp_path
538 538
539 539 def get_checkpoint_model(self, checkpoint_id, path):
540 540 """construct the info dict for a given checkpoint"""
541 541 path = path.strip('/')
542 542 cp_path = self.get_checkpoint_path(checkpoint_id, path)
543 543 stats = os.stat(cp_path)
544 544 last_modified = tz.utcfromtimestamp(stats.st_mtime)
545 545 info = dict(
546 546 id = checkpoint_id,
547 547 last_modified = last_modified,
548 548 )
549 549 return info
550 550
551 551 # public checkpoint API
552 552
553 553 def create_checkpoint(self, path):
554 554 """Create a checkpoint from the current state of a file"""
555 555 path = path.strip('/')
556 556 if not self.file_exists(path):
557 557 raise web.HTTPError(404)
558 558 src_path = self._get_os_path(path)
559 559 # only the one checkpoint ID:
560 560 checkpoint_id = u"checkpoint"
561 561 cp_path = self.get_checkpoint_path(checkpoint_id, path)
562 562 self.log.debug("creating checkpoint for %s", path)
563 563 with self.perm_to_403():
564 564 self._copy(src_path, cp_path)
565 565
566 566 # return the checkpoint info
567 567 return self.get_checkpoint_model(checkpoint_id, path)
568 568
569 569 def list_checkpoints(self, path):
570 570 """list the checkpoints for a given file
571 571
572 572 This contents manager currently only supports one checkpoint per file.
573 573 """
574 574 path = path.strip('/')
575 575 checkpoint_id = "checkpoint"
576 576 os_path = self.get_checkpoint_path(checkpoint_id, path)
577 577 if not os.path.exists(os_path):
578 578 return []
579 579 else:
580 580 return [self.get_checkpoint_model(checkpoint_id, path)]
581 581
582 582
583 583 def restore_checkpoint(self, checkpoint_id, path):
584 584 """restore a file to a checkpointed state"""
585 585 path = path.strip('/')
586 586 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
587 587 nb_path = self._get_os_path(path)
588 588 cp_path = self.get_checkpoint_path(checkpoint_id, path)
589 589 if not os.path.isfile(cp_path):
590 590 self.log.debug("checkpoint file does not exist: %s", cp_path)
591 591 raise web.HTTPError(404,
592 592 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
593 593 )
594 594 # ensure notebook is readable (never restore from an unreadable notebook)
595 595 if cp_path.endswith('.ipynb'):
596 596 with self.open(cp_path, 'r', encoding='utf-8') as f:
597 597 nbformat.read(f, as_version=4)
598 598 self.log.debug("copying %s -> %s", cp_path, nb_path)
599 599 with self.perm_to_403():
600 600 self._copy(cp_path, nb_path)
601 601
602 602 def delete_checkpoint(self, checkpoint_id, path):
603 603 """delete a file's checkpoint"""
604 604 path = path.strip('/')
605 605 cp_path = self.get_checkpoint_path(checkpoint_id, path)
606 606 if not os.path.isfile(cp_path):
607 607 raise web.HTTPError(404,
608 608 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
609 609 )
610 610 self.log.debug("unlinking %s", cp_path)
611 611 os.unlink(cp_path)
612 612
613 613 def info_string(self):
614 614 return "Serving notebooks from local directory: %s" % self.root_dir
615 615
616 616 def get_kernel_path(self, path, model=None):
617 617 """Return the initial API path of a kernel associated with a given notebook"""
618 618 if '/' in path:
619 619 parent_dir = path.rsplit('/', 1)[0]
620 620 else:
621 621 parent_dir = ''
622 622 return parent_dir
@@ -1,266 +1,266 b''
1 1 """Tornado handlers for the contents web service."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 7
8 8 from tornado import web
9 9
10 10 from IPython.html.utils import url_path_join, url_escape
11 11 from IPython.utils.jsonutil import date_default
12 12
13 13 from IPython.html.base.handlers import (
14 14 IPythonHandler, json_errors, path_regex,
15 15 )
16 16
17 17
18 18 def sort_key(model):
19 19 """key function for case-insensitive sort by name and type"""
20 20 iname = model['name'].lower()
21 21 type_key = {
22 22 'directory' : '0',
23 23 'notebook' : '1',
24 24 'file' : '2',
25 25 }.get(model['type'], '9')
26 26 return u'%s%s' % (type_key, iname)
27 27
28 28 class ContentsHandler(IPythonHandler):
29 29
30 30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
31 31
32 32 def location_url(self, path):
33 33 """Return the full URL location of a file.
34 34
35 35 Parameters
36 36 ----------
37 37 path : unicode
38 38 The API path of the file, such as "foo/bar.txt".
39 39 """
40 40 return url_escape(url_path_join(
41 41 self.base_url, 'api', 'contents', path
42 42 ))
43 43
44 44 def _finish_model(self, model, location=True):
45 45 """Finish a JSON request with a model, setting relevant headers, etc."""
46 46 if location:
47 47 location = self.location_url(model['path'])
48 48 self.set_header('Location', location)
49 49 self.set_header('Last-Modified', model['last_modified'])
50 50 self.set_header('Content-Type', 'application/json')
51 51 self.finish(json.dumps(model, default=date_default))
52 52
53 53 @web.authenticated
54 54 @json_errors
55 55 def get(self, path=''):
56 56 """Return a model for a file or directory.
57 57
58 58 A directory model contains a list of models (without content)
59 59 of the files and directories it contains.
60 60 """
61 61 path = path or ''
62 type_ = self.get_query_argument('type', default=None)
63 if type_ not in {None, 'directory', 'file', 'notebook'}:
64 raise web.HTTPError(400, u'Type %r is invalid' % type_)
62 type = self.get_query_argument('type', default=None)
63 if type not in {None, 'directory', 'file', 'notebook'}:
64 raise web.HTTPError(400, u'Type %r is invalid' % type)
65 65
66 66 format = self.get_query_argument('format', default=None)
67 67 if format not in {None, 'text', 'base64'}:
68 68 raise web.HTTPError(400, u'Format %r is invalid' % format)
69 69
70 model = self.contents_manager.get(path=path, type_=type_, format=format)
70 model = self.contents_manager.get(path=path, type=type, format=format)
71 71 if model['type'] == 'directory':
72 72 # group listing by type, then by name (case-insensitive)
73 73 # FIXME: sorting should be done in the frontends
74 74 model['content'].sort(key=sort_key)
75 75 self._finish_model(model, location=False)
76 76
77 77 @web.authenticated
78 78 @json_errors
79 79 def patch(self, path=''):
80 80 """PATCH renames a file or directory without re-uploading content."""
81 81 cm = self.contents_manager
82 82 model = self.get_json_body()
83 83 if model is None:
84 84 raise web.HTTPError(400, u'JSON body missing')
85 85 model = cm.update(model, path)
86 86 self._finish_model(model)
87 87
88 88 def _copy(self, copy_from, copy_to=None):
89 89 """Copy a file, optionally specifying a target directory."""
90 90 self.log.info(u"Copying {copy_from} to {copy_to}".format(
91 91 copy_from=copy_from,
92 92 copy_to=copy_to or '',
93 93 ))
94 94 model = self.contents_manager.copy(copy_from, copy_to)
95 95 self.set_status(201)
96 96 self._finish_model(model)
97 97
98 98 def _upload(self, model, path):
99 99 """Handle upload of a new file to path"""
100 100 self.log.info(u"Uploading file to %s", path)
101 101 model = self.contents_manager.new(model, path)
102 102 self.set_status(201)
103 103 self._finish_model(model)
104 104
105 105 def _new_untitled(self, path, type='', ext=''):
106 106 """Create a new, empty untitled entity"""
107 107 self.log.info(u"Creating new %s in %s", type or 'file', path)
108 108 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
109 109 self.set_status(201)
110 110 self._finish_model(model)
111 111
112 112 def _save(self, model, path):
113 113 """Save an existing file."""
114 114 self.log.info(u"Saving file at %s", path)
115 115 model = self.contents_manager.save(model, path)
116 116 self._finish_model(model)
117 117
118 118 @web.authenticated
119 119 @json_errors
120 120 def post(self, path=''):
121 121 """Create a new file in the specified path.
122 122
123 123 POST creates new files. The server always decides on the name.
124 124
125 125 POST /api/contents/path
126 126 New untitled, empty file or directory.
127 127 POST /api/contents/path
128 128 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
129 129 New copy of OtherNotebook in path
130 130 """
131 131
132 132 cm = self.contents_manager
133 133
134 134 if cm.file_exists(path):
135 135 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
136 136
137 137 if not cm.dir_exists(path):
138 138 raise web.HTTPError(404, "No such directory: %s" % path)
139 139
140 140 model = self.get_json_body()
141 141
142 142 if model is not None:
143 143 copy_from = model.get('copy_from')
144 144 ext = model.get('ext', '')
145 145 type = model.get('type', '')
146 146 if copy_from:
147 147 self._copy(copy_from, path)
148 148 else:
149 149 self._new_untitled(path, type=type, ext=ext)
150 150 else:
151 151 self._new_untitled(path)
152 152
153 153 @web.authenticated
154 154 @json_errors
155 155 def put(self, path=''):
156 156 """Saves the file in the location specified by name and path.
157 157
158 158 PUT is very similar to POST, but the requester specifies the name,
159 159 whereas with POST, the server picks the name.
160 160
161 161 PUT /api/contents/path/Name.ipynb
162 162 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
163 163 in `content` key of JSON request body. If content is not specified,
164 164 create a new empty notebook.
165 165 """
166 166 model = self.get_json_body()
167 167 if model:
168 168 if model.get('copy_from'):
169 169 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
170 170 if self.contents_manager.file_exists(path):
171 171 self._save(model, path)
172 172 else:
173 173 self._upload(model, path)
174 174 else:
175 175 self._new_untitled(path)
176 176
177 177 @web.authenticated
178 178 @json_errors
179 179 def delete(self, path=''):
180 180 """delete a file in the given path"""
181 181 cm = self.contents_manager
182 182 self.log.warn('delete %s', path)
183 183 cm.delete(path)
184 184 self.set_status(204)
185 185 self.finish()
186 186
187 187
188 188 class CheckpointsHandler(IPythonHandler):
189 189
190 190 SUPPORTED_METHODS = ('GET', 'POST')
191 191
192 192 @web.authenticated
193 193 @json_errors
194 194 def get(self, path=''):
195 195 """get lists checkpoints for a file"""
196 196 cm = self.contents_manager
197 197 checkpoints = cm.list_checkpoints(path)
198 198 data = json.dumps(checkpoints, default=date_default)
199 199 self.finish(data)
200 200
201 201 @web.authenticated
202 202 @json_errors
203 203 def post(self, path=''):
204 204 """post creates a new checkpoint"""
205 205 cm = self.contents_manager
206 206 checkpoint = cm.create_checkpoint(path)
207 207 data = json.dumps(checkpoint, default=date_default)
208 208 location = url_path_join(self.base_url, 'api/contents',
209 209 path, 'checkpoints', checkpoint['id'])
210 210 self.set_header('Location', url_escape(location))
211 211 self.set_status(201)
212 212 self.finish(data)
213 213
214 214
215 215 class ModifyCheckpointsHandler(IPythonHandler):
216 216
217 217 SUPPORTED_METHODS = ('POST', 'DELETE')
218 218
219 219 @web.authenticated
220 220 @json_errors
221 221 def post(self, path, checkpoint_id):
222 222 """post restores a file from a checkpoint"""
223 223 cm = self.contents_manager
224 224 cm.restore_checkpoint(checkpoint_id, path)
225 225 self.set_status(204)
226 226 self.finish()
227 227
228 228 @web.authenticated
229 229 @json_errors
230 230 def delete(self, path, checkpoint_id):
231 231 """delete clears a checkpoint for a given file"""
232 232 cm = self.contents_manager
233 233 cm.delete_checkpoint(checkpoint_id, path)
234 234 self.set_status(204)
235 235 self.finish()
236 236
237 237
238 238 class NotebooksRedirectHandler(IPythonHandler):
239 239 """Redirect /api/notebooks to /api/contents"""
240 240 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
241 241
242 242 def get(self, path):
243 243 self.log.warn("/api/notebooks is deprecated, use /api/contents")
244 244 self.redirect(url_path_join(
245 245 self.base_url,
246 246 'api/contents',
247 247 path
248 248 ))
249 249
250 250 put = patch = post = delete = get
251 251
252 252
253 253 #-----------------------------------------------------------------------------
254 254 # URL to handler mappings
255 255 #-----------------------------------------------------------------------------
256 256
257 257
258 258 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
259 259
260 260 default_handlers = [
261 261 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
262 262 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
263 263 ModifyCheckpointsHandler),
264 264 (r"/api/contents%s" % path_regex, ContentsHandler),
265 265 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
266 266 ]
@@ -1,384 +1,384 b''
1 1 """A base class for contents managers."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from fnmatch import fnmatch
7 7 import itertools
8 8 import json
9 9 import os
10 10 import re
11 11
12 12 from tornado.web import HTTPError
13 13
14 14 from IPython.config.configurable import LoggingConfigurable
15 15 from IPython.nbformat import sign, validate, ValidationError
16 16 from IPython.nbformat.v4 import new_notebook
17 17 from IPython.utils.traitlets import Instance, Unicode, List
18 18
19 19 copy_pat = re.compile(r'\-Copy\d*\.')
20 20
21 21 class ContentsManager(LoggingConfigurable):
22 22 """Base class for serving files and directories.
23 23
24 24 This serves any text or binary file,
25 25 as well as directories,
26 26 with special handling for JSON notebook documents.
27 27
28 28 Most APIs take a path argument,
29 29 which is always an API-style unicode path,
30 30 and always refers to a directory.
31 31
32 32 - unicode, not url-escaped
33 33 - '/'-separated
34 34 - leading and trailing '/' will be stripped
35 35 - if unspecified, path defaults to '',
36 36 indicating the root path.
37 37
38 38 """
39 39
40 40 notary = Instance(sign.NotebookNotary)
41 41 def _notary_default(self):
42 42 return sign.NotebookNotary(parent=self)
43 43
44 44 hide_globs = List(Unicode, [
45 45 u'__pycache__', '*.pyc', '*.pyo',
46 46 '.DS_Store', '*.so', '*.dylib', '*~',
47 47 ], config=True, help="""
48 48 Glob patterns to hide in file and directory listings.
49 49 """)
50 50
51 51 untitled_notebook = Unicode("Untitled", config=True,
52 52 help="The base name used when creating untitled notebooks."
53 53 )
54 54
55 55 untitled_file = Unicode("untitled", config=True,
56 56 help="The base name used when creating untitled files."
57 57 )
58 58
59 59 untitled_directory = Unicode("Untitled Folder", config=True,
60 60 help="The base name used when creating untitled directories."
61 61 )
62 62
63 63 # ContentsManager API part 1: methods that must be
64 64 # implemented in subclasses.
65 65
66 66 def dir_exists(self, path):
67 67 """Does the API-style path (directory) actually exist?
68 68
69 69 Like os.path.isdir
70 70
71 71 Override this method in subclasses.
72 72
73 73 Parameters
74 74 ----------
75 75 path : string
76 76 The path to check
77 77
78 78 Returns
79 79 -------
80 80 exists : bool
81 81 Whether the path does indeed exist.
82 82 """
83 83 raise NotImplementedError
84 84
85 85 def is_hidden(self, path):
86 86 """Does the API style path correspond to a hidden directory or file?
87 87
88 88 Parameters
89 89 ----------
90 90 path : string
91 91 The path to check. This is an API path (`/` separated,
92 92 relative to root dir).
93 93
94 94 Returns
95 95 -------
96 96 hidden : bool
97 97 Whether the path is hidden.
98 98
99 99 """
100 100 raise NotImplementedError
101 101
102 102 def file_exists(self, path=''):
103 103 """Does a file exist at the given path?
104 104
105 105 Like os.path.isfile
106 106
107 107 Override this method in subclasses.
108 108
109 109 Parameters
110 110 ----------
111 111 name : string
112 112 The name of the file you are checking.
113 113 path : string
114 114 The relative path to the file's directory (with '/' as separator)
115 115
116 116 Returns
117 117 -------
118 118 exists : bool
119 119 Whether the file exists.
120 120 """
121 121 raise NotImplementedError('must be implemented in a subclass')
122 122
123 123 def exists(self, path):
124 124 """Does a file or directory exist at the given path?
125 125
126 126 Like os.path.exists
127 127
128 128 Parameters
129 129 ----------
130 130 path : string
131 131 The relative path to the file's directory (with '/' as separator)
132 132
133 133 Returns
134 134 -------
135 135 exists : bool
136 136 Whether the target exists.
137 137 """
138 138 return self.file_exists(path) or self.dir_exists(path)
139 139
140 def get(self, path, content=True, type_=None, format=None):
140 def get(self, path, content=True, type=None, format=None):
141 141 """Get the model of a file or directory with or without content."""
142 142 raise NotImplementedError('must be implemented in a subclass')
143 143
144 144 def save(self, model, path):
145 145 """Save the file or directory and return the model with no content."""
146 146 raise NotImplementedError('must be implemented in a subclass')
147 147
148 148 def update(self, model, path):
149 149 """Update the file or directory and return the model with no content.
150 150
151 151 For use in PATCH requests, to enable renaming a file without
152 152 re-uploading its contents. Only used for renaming at the moment.
153 153 """
154 154 raise NotImplementedError('must be implemented in a subclass')
155 155
156 156 def delete(self, path):
157 157 """Delete file or directory by path."""
158 158 raise NotImplementedError('must be implemented in a subclass')
159 159
160 160 def create_checkpoint(self, path):
161 161 """Create a checkpoint of the current state of a file
162 162
163 163 Returns a checkpoint_id for the new checkpoint.
164 164 """
165 165 raise NotImplementedError("must be implemented in a subclass")
166 166
167 167 def list_checkpoints(self, path):
168 168 """Return a list of checkpoints for a given file"""
169 169 return []
170 170
171 171 def restore_checkpoint(self, checkpoint_id, path):
172 172 """Restore a file from one of its checkpoints"""
173 173 raise NotImplementedError("must be implemented in a subclass")
174 174
175 175 def delete_checkpoint(self, checkpoint_id, path):
176 176 """delete a checkpoint for a file"""
177 177 raise NotImplementedError("must be implemented in a subclass")
178 178
179 179 # ContentsManager API part 2: methods that have useable default
180 180 # implementations, but can be overridden in subclasses.
181 181
182 182 def info_string(self):
183 183 return "Serving contents"
184 184
185 185 def get_kernel_path(self, path, model=None):
186 186 """Return the API path for the kernel
187 187
188 188 KernelManagers can turn this value into a filesystem path,
189 189 or ignore it altogether.
190 190
191 191 The default value here will start kernels in the directory of the
192 192 notebook server. FileContentsManager overrides this to use the
193 193 directory containing the notebook.
194 194 """
195 195 return ''
196 196
197 197 def increment_filename(self, filename, path='', insert=''):
198 198 """Increment a filename until it is unique.
199 199
200 200 Parameters
201 201 ----------
202 202 filename : unicode
203 203 The name of a file, including extension
204 204 path : unicode
205 205 The API path of the target's directory
206 206
207 207 Returns
208 208 -------
209 209 name : unicode
210 210 A filename that is unique, based on the input filename.
211 211 """
212 212 path = path.strip('/')
213 213 basename, ext = os.path.splitext(filename)
214 214 for i in itertools.count():
215 215 if i:
216 216 insert_i = '{}{}'.format(insert, i)
217 217 else:
218 218 insert_i = ''
219 219 name = u'{basename}{insert}{ext}'.format(basename=basename,
220 220 insert=insert_i, ext=ext)
221 221 if not self.exists(u'{}/{}'.format(path, name)):
222 222 break
223 223 return name
224 224
225 225 def validate_notebook_model(self, model):
226 226 """Add failed-validation message to model"""
227 227 try:
228 228 validate(model['content'])
229 229 except ValidationError as e:
230 230 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
231 231 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
232 232 )
233 233 return model
234 234
235 235 def new_untitled(self, path='', type='', ext=''):
236 236 """Create a new untitled file or directory in path
237 237
238 238 path must be a directory
239 239
240 240 File extension can be specified.
241 241
242 242 Use `new` to create files with a fully specified path (including filename).
243 243 """
244 244 path = path.strip('/')
245 245 if not self.dir_exists(path):
246 246 raise HTTPError(404, 'No such directory: %s' % path)
247 247
248 248 model = {}
249 249 if type:
250 250 model['type'] = type
251 251
252 252 if ext == '.ipynb':
253 253 model.setdefault('type', 'notebook')
254 254 else:
255 255 model.setdefault('type', 'file')
256 256
257 257 insert = ''
258 258 if model['type'] == 'directory':
259 259 untitled = self.untitled_directory
260 260 insert = ' '
261 261 elif model['type'] == 'notebook':
262 262 untitled = self.untitled_notebook
263 263 ext = '.ipynb'
264 264 elif model['type'] == 'file':
265 265 untitled = self.untitled_file
266 266 else:
267 267 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
268 268
269 269 name = self.increment_filename(untitled + ext, path, insert=insert)
270 270 path = u'{0}/{1}'.format(path, name)
271 271 return self.new(model, path)
272 272
273 273 def new(self, model=None, path=''):
274 274 """Create a new file or directory and return its model with no content.
275 275
276 276 To create a new untitled entity in a directory, use `new_untitled`.
277 277 """
278 278 path = path.strip('/')
279 279 if model is None:
280 280 model = {}
281 281
282 282 if path.endswith('.ipynb'):
283 283 model.setdefault('type', 'notebook')
284 284 else:
285 285 model.setdefault('type', 'file')
286 286
287 287 # no content, not a directory, so fill out new-file model
288 288 if 'content' not in model and model['type'] != 'directory':
289 289 if model['type'] == 'notebook':
290 290 model['content'] = new_notebook()
291 291 model['format'] = 'json'
292 292 else:
293 293 model['content'] = ''
294 294 model['type'] = 'file'
295 295 model['format'] = 'text'
296 296
297 297 model = self.save(model, path)
298 298 return model
299 299
300 300 def copy(self, from_path, to_path=None):
301 301 """Copy an existing file and return its new model.
302 302
303 303 If to_path not specified, it will be the parent directory of from_path.
304 304 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
305 305
306 306 from_path must be a full path to a file.
307 307 """
308 308 path = from_path.strip('/')
309 309 if '/' in path:
310 310 from_dir, from_name = path.rsplit('/', 1)
311 311 else:
312 312 from_dir = ''
313 313 from_name = path
314 314
315 315 model = self.get(path)
316 316 model.pop('path', None)
317 317 model.pop('name', None)
318 318 if model['type'] == 'directory':
319 319 raise HTTPError(400, "Can't copy directories")
320 320
321 321 if not to_path:
322 322 to_path = from_dir
323 323 if self.dir_exists(to_path):
324 324 name = copy_pat.sub(u'.', from_name)
325 325 to_name = self.increment_filename(name, to_path, insert='-Copy')
326 326 to_path = u'{0}/{1}'.format(to_path, to_name)
327 327
328 328 model = self.save(model, to_path)
329 329 return model
330 330
331 331 def log_info(self):
332 332 self.log.info(self.info_string())
333 333
334 334 def trust_notebook(self, path):
335 335 """Explicitly trust a notebook
336 336
337 337 Parameters
338 338 ----------
339 339 path : string
340 340 The path of a notebook
341 341 """
342 342 model = self.get(path)
343 343 nb = model['content']
344 344 self.log.warn("Trusting notebook %s", path)
345 345 self.notary.mark_cells(nb, True)
346 346 self.save(model, path)
347 347
348 348 def check_and_sign(self, nb, path=''):
349 349 """Check for trusted cells, and sign the notebook.
350 350
351 351 Called as a part of saving notebooks.
352 352
353 353 Parameters
354 354 ----------
355 355 nb : dict
356 356 The notebook dict
357 357 path : string
358 358 The notebook's path (for logging)
359 359 """
360 360 if self.notary.check_cells(nb):
361 361 self.notary.sign(nb)
362 362 else:
363 363 self.log.warn("Saving untrusted notebook %s", path)
364 364
365 365 def mark_trusted_cells(self, nb, path=''):
366 366 """Mark cells as trusted if the notebook signature matches.
367 367
368 368 Called as a part of loading notebooks.
369 369
370 370 Parameters
371 371 ----------
372 372 nb : dict
373 373 The notebook object (in current nbformat)
374 374 path : string
375 375 The notebook's path (for logging)
376 376 """
377 377 trusted = self.notary.check_signature(nb)
378 378 if not trusted:
379 379 self.log.warn("Notebook %s is not trusted", path)
380 380 self.notary.mark_cells(nb, trusted)
381 381
382 382 def should_list(self, name):
383 383 """Should this file/directory name be displayed in a listing?"""
384 384 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,521 +1,521 b''
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 import io
6 6 import json
7 7 import os
8 8 import shutil
9 9 from unicodedata import normalize
10 10
11 11 pjoin = os.path.join
12 12
13 13 import requests
14 14
15 15 from IPython.html.utils import url_path_join, url_escape
16 16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 17 from IPython.nbformat import read, write, from_dict
18 18 from IPython.nbformat.v4 import (
19 19 new_notebook, new_markdown_cell,
20 20 )
21 21 from IPython.nbformat import v2
22 22 from IPython.utils import py3compat
23 23 from IPython.utils.data import uniq_stable
24 24
25 25
26 26 def notebooks_only(dir_model):
27 27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28 28
29 29 def dirs_only(dir_model):
30 30 return [x for x in dir_model['content'] if x['type']=='directory']
31 31
32 32
33 33 class API(object):
34 34 """Wrapper for contents API calls."""
35 35 def __init__(self, base_url):
36 36 self.base_url = base_url
37 37
38 38 def _req(self, verb, path, body=None, params=None):
39 39 response = requests.request(verb,
40 40 url_path_join(self.base_url, 'api/contents', path),
41 41 data=body, params=params,
42 42 )
43 43 response.raise_for_status()
44 44 return response
45 45
46 46 def list(self, path='/'):
47 47 return self._req('GET', path)
48 48
49 def read(self, path, type_=None, format=None):
49 def read(self, path, type=None, format=None):
50 50 params = {}
51 if type_ is not None:
52 params['type'] = type_
51 if type is not None:
52 params['type'] = type
53 53 if format is not None:
54 54 params['format'] = format
55 55 return self._req('GET', path, params=params)
56 56
57 57 def create_untitled(self, path='/', ext='.ipynb'):
58 58 body = None
59 59 if ext:
60 60 body = json.dumps({'ext': ext})
61 61 return self._req('POST', path, body)
62 62
63 63 def mkdir_untitled(self, path='/'):
64 64 return self._req('POST', path, json.dumps({'type': 'directory'}))
65 65
66 66 def copy(self, copy_from, path='/'):
67 67 body = json.dumps({'copy_from':copy_from})
68 68 return self._req('POST', path, body)
69 69
70 70 def create(self, path='/'):
71 71 return self._req('PUT', path)
72 72
73 73 def upload(self, path, body):
74 74 return self._req('PUT', path, body)
75 75
76 76 def mkdir_untitled(self, path='/'):
77 77 return self._req('POST', path, json.dumps({'type': 'directory'}))
78 78
79 79 def mkdir(self, path='/'):
80 80 return self._req('PUT', path, json.dumps({'type': 'directory'}))
81 81
82 82 def copy_put(self, copy_from, path='/'):
83 83 body = json.dumps({'copy_from':copy_from})
84 84 return self._req('PUT', path, body)
85 85
86 86 def save(self, path, body):
87 87 return self._req('PUT', path, body)
88 88
89 89 def delete(self, path='/'):
90 90 return self._req('DELETE', path)
91 91
92 92 def rename(self, path, new_path):
93 93 body = json.dumps({'path': new_path})
94 94 return self._req('PATCH', path, body)
95 95
96 96 def get_checkpoints(self, path):
97 97 return self._req('GET', url_path_join(path, 'checkpoints'))
98 98
99 99 def new_checkpoint(self, path):
100 100 return self._req('POST', url_path_join(path, 'checkpoints'))
101 101
102 102 def restore_checkpoint(self, path, checkpoint_id):
103 103 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
104 104
105 105 def delete_checkpoint(self, path, checkpoint_id):
106 106 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
107 107
108 108 class APITest(NotebookTestBase):
109 109 """Test the kernels web service API"""
110 110 dirs_nbs = [('', 'inroot'),
111 111 ('Directory with spaces in', 'inspace'),
112 112 (u'unicodΓ©', 'innonascii'),
113 113 ('foo', 'a'),
114 114 ('foo', 'b'),
115 115 ('foo', 'name with spaces'),
116 116 ('foo', u'unicodΓ©'),
117 117 ('foo/bar', 'baz'),
118 118 ('ordering', 'A'),
119 119 ('ordering', 'b'),
120 120 ('ordering', 'C'),
121 121 (u'Γ₯ b', u'Γ§ d'),
122 122 ]
123 123 hidden_dirs = ['.hidden', '__pycache__']
124 124
125 125 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
126 126 del dirs[0] # remove ''
127 127 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
128 128
129 129 @staticmethod
130 130 def _blob_for_name(name):
131 131 return name.encode('utf-8') + b'\xFF'
132 132
133 133 @staticmethod
134 134 def _txt_for_name(name):
135 135 return u'%s text file' % name
136 136
137 137 def setUp(self):
138 138 nbdir = self.notebook_dir.name
139 139 self.blob = os.urandom(100)
140 140 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
141 141
142 142 for d in (self.dirs + self.hidden_dirs):
143 143 d.replace('/', os.sep)
144 144 if not os.path.isdir(pjoin(nbdir, d)):
145 145 os.mkdir(pjoin(nbdir, d))
146 146
147 147 for d, name in self.dirs_nbs:
148 148 d = d.replace('/', os.sep)
149 149 # create a notebook
150 150 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
151 151 encoding='utf-8') as f:
152 152 nb = new_notebook()
153 153 write(nb, f, version=4)
154 154
155 155 # create a text file
156 156 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
157 157 encoding='utf-8') as f:
158 158 f.write(self._txt_for_name(name))
159 159
160 160 # create a binary file
161 161 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
162 162 f.write(self._blob_for_name(name))
163 163
164 164 self.api = API(self.base_url())
165 165
166 166 def tearDown(self):
167 167 nbdir = self.notebook_dir.name
168 168
169 169 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
170 170 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
171 171
172 172 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
173 173 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
174 174
175 175 def test_list_notebooks(self):
176 176 nbs = notebooks_only(self.api.list().json())
177 177 self.assertEqual(len(nbs), 1)
178 178 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
179 179
180 180 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
181 181 self.assertEqual(len(nbs), 1)
182 182 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
183 183
184 184 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
185 185 self.assertEqual(len(nbs), 1)
186 186 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
187 187 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
188 188
189 189 nbs = notebooks_only(self.api.list('/foo/bar/').json())
190 190 self.assertEqual(len(nbs), 1)
191 191 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
192 192 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
193 193
194 194 nbs = notebooks_only(self.api.list('foo').json())
195 195 self.assertEqual(len(nbs), 4)
196 196 nbnames = { normalize('NFC', n['name']) for n in nbs }
197 197 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
198 198 expected = { normalize('NFC', name) for name in expected }
199 199 self.assertEqual(nbnames, expected)
200 200
201 201 nbs = notebooks_only(self.api.list('ordering').json())
202 202 nbnames = [n['name'] for n in nbs]
203 203 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
204 204 self.assertEqual(nbnames, expected)
205 205
206 206 def test_list_dirs(self):
207 207 print(self.api.list().json())
208 208 dirs = dirs_only(self.api.list().json())
209 209 dir_names = {normalize('NFC', d['name']) for d in dirs}
210 210 print(dir_names)
211 211 print(self.top_level_dirs)
212 212 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
213 213
214 214 def test_list_nonexistant_dir(self):
215 215 with assert_http_error(404):
216 216 self.api.list('nonexistant')
217 217
218 218 def test_get_nb_contents(self):
219 219 for d, name in self.dirs_nbs:
220 220 path = url_path_join(d, name + '.ipynb')
221 221 nb = self.api.read(path).json()
222 222 self.assertEqual(nb['name'], u'%s.ipynb' % name)
223 223 self.assertEqual(nb['path'], path)
224 224 self.assertEqual(nb['type'], 'notebook')
225 225 self.assertIn('content', nb)
226 226 self.assertEqual(nb['format'], 'json')
227 227 self.assertIn('content', nb)
228 228 self.assertIn('metadata', nb['content'])
229 229 self.assertIsInstance(nb['content']['metadata'], dict)
230 230
231 231 def test_get_contents_no_such_file(self):
232 232 # Name that doesn't exist - should be a 404
233 233 with assert_http_error(404):
234 234 self.api.read('foo/q.ipynb')
235 235
236 236 def test_get_text_file_contents(self):
237 237 for d, name in self.dirs_nbs:
238 238 path = url_path_join(d, name + '.txt')
239 239 model = self.api.read(path).json()
240 240 self.assertEqual(model['name'], u'%s.txt' % name)
241 241 self.assertEqual(model['path'], path)
242 242 self.assertIn('content', model)
243 243 self.assertEqual(model['format'], 'text')
244 244 self.assertEqual(model['type'], 'file')
245 245 self.assertEqual(model['content'], self._txt_for_name(name))
246 246
247 247 # Name that doesn't exist - should be a 404
248 248 with assert_http_error(404):
249 249 self.api.read('foo/q.txt')
250 250
251 251 # Specifying format=text should fail on a non-UTF-8 file
252 252 with assert_http_error(400):
253 self.api.read('foo/bar/baz.blob', type_='file', format='text')
253 self.api.read('foo/bar/baz.blob', type='file', format='text')
254 254
255 255 def test_get_binary_file_contents(self):
256 256 for d, name in self.dirs_nbs:
257 257 path = url_path_join(d, name + '.blob')
258 258 model = self.api.read(path).json()
259 259 self.assertEqual(model['name'], u'%s.blob' % name)
260 260 self.assertEqual(model['path'], path)
261 261 self.assertIn('content', model)
262 262 self.assertEqual(model['format'], 'base64')
263 263 self.assertEqual(model['type'], 'file')
264 264 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
265 265 self.assertEqual(model['content'], b64_data)
266 266
267 267 # Name that doesn't exist - should be a 404
268 268 with assert_http_error(404):
269 269 self.api.read('foo/q.txt')
270 270
271 271 def test_get_bad_type(self):
272 272 with assert_http_error(400):
273 self.api.read(u'unicodΓ©', type_='file') # this is a directory
273 self.api.read(u'unicodΓ©', type='file') # this is a directory
274 274
275 275 with assert_http_error(400):
276 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
276 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
277 277
278 278 def _check_created(self, resp, path, type='notebook'):
279 279 self.assertEqual(resp.status_code, 201)
280 280 location_header = py3compat.str_to_unicode(resp.headers['Location'])
281 281 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
282 282 rjson = resp.json()
283 283 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
284 284 self.assertEqual(rjson['path'], path)
285 285 self.assertEqual(rjson['type'], type)
286 286 isright = os.path.isdir if type == 'directory' else os.path.isfile
287 287 assert isright(pjoin(
288 288 self.notebook_dir.name,
289 289 path.replace('/', os.sep),
290 290 ))
291 291
292 292 def test_create_untitled(self):
293 293 resp = self.api.create_untitled(path=u'Γ₯ b')
294 294 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
295 295
296 296 # Second time
297 297 resp = self.api.create_untitled(path=u'Γ₯ b')
298 298 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
299 299
300 300 # And two directories down
301 301 resp = self.api.create_untitled(path='foo/bar')
302 302 self._check_created(resp, 'foo/bar/Untitled.ipynb')
303 303
304 304 def test_create_untitled_txt(self):
305 305 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
306 306 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
307 307
308 308 resp = self.api.read(path='foo/bar/untitled.txt')
309 309 model = resp.json()
310 310 self.assertEqual(model['type'], 'file')
311 311 self.assertEqual(model['format'], 'text')
312 312 self.assertEqual(model['content'], '')
313 313
314 314 def test_upload(self):
315 315 nb = new_notebook()
316 316 nbmodel = {'content': nb, 'type': 'notebook'}
317 317 path = u'Γ₯ b/Upload tΓ©st.ipynb'
318 318 resp = self.api.upload(path, body=json.dumps(nbmodel))
319 319 self._check_created(resp, path)
320 320
321 321 def test_mkdir_untitled(self):
322 322 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
323 323 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
324 324
325 325 # Second time
326 326 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
327 327 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
328 328
329 329 # And two directories down
330 330 resp = self.api.mkdir_untitled(path='foo/bar')
331 331 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
332 332
333 333 def test_mkdir(self):
334 334 path = u'Γ₯ b/New βˆ‚ir'
335 335 resp = self.api.mkdir(path)
336 336 self._check_created(resp, path, type='directory')
337 337
338 338 def test_mkdir_hidden_400(self):
339 339 with assert_http_error(400):
340 340 resp = self.api.mkdir(u'Γ₯ b/.hidden')
341 341
342 342 def test_upload_txt(self):
343 343 body = u'ΓΌnicode tΓ©xt'
344 344 model = {
345 345 'content' : body,
346 346 'format' : 'text',
347 347 'type' : 'file',
348 348 }
349 349 path = u'Γ₯ b/Upload tΓ©st.txt'
350 350 resp = self.api.upload(path, body=json.dumps(model))
351 351
352 352 # check roundtrip
353 353 resp = self.api.read(path)
354 354 model = resp.json()
355 355 self.assertEqual(model['type'], 'file')
356 356 self.assertEqual(model['format'], 'text')
357 357 self.assertEqual(model['content'], body)
358 358
359 359 def test_upload_b64(self):
360 360 body = b'\xFFblob'
361 361 b64body = base64.encodestring(body).decode('ascii')
362 362 model = {
363 363 'content' : b64body,
364 364 'format' : 'base64',
365 365 'type' : 'file',
366 366 }
367 367 path = u'Γ₯ b/Upload tΓ©st.blob'
368 368 resp = self.api.upload(path, body=json.dumps(model))
369 369
370 370 # check roundtrip
371 371 resp = self.api.read(path)
372 372 model = resp.json()
373 373 self.assertEqual(model['type'], 'file')
374 374 self.assertEqual(model['path'], path)
375 375 self.assertEqual(model['format'], 'base64')
376 376 decoded = base64.decodestring(model['content'].encode('ascii'))
377 377 self.assertEqual(decoded, body)
378 378
379 379 def test_upload_v2(self):
380 380 nb = v2.new_notebook()
381 381 ws = v2.new_worksheet()
382 382 nb.worksheets.append(ws)
383 383 ws.cells.append(v2.new_code_cell(input='print("hi")'))
384 384 nbmodel = {'content': nb, 'type': 'notebook'}
385 385 path = u'Γ₯ b/Upload tΓ©st.ipynb'
386 386 resp = self.api.upload(path, body=json.dumps(nbmodel))
387 387 self._check_created(resp, path)
388 388 resp = self.api.read(path)
389 389 data = resp.json()
390 390 self.assertEqual(data['content']['nbformat'], 4)
391 391
392 392 def test_copy(self):
393 393 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
394 394 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
395 395
396 396 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
397 397 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
398 398
399 399 def test_copy_copy(self):
400 400 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
401 401 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
402 402
403 403 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
404 404 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
405 405
406 406 def test_copy_path(self):
407 407 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
408 408 self._check_created(resp, u'Γ₯ b/a.ipynb')
409 409
410 410 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
411 411 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
412 412
413 413 def test_copy_put_400(self):
414 414 with assert_http_error(400):
415 415 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
416 416
417 417 def test_copy_dir_400(self):
418 418 # can't copy directories
419 419 with assert_http_error(400):
420 420 resp = self.api.copy(u'Γ₯ b', u'foo')
421 421
422 422 def test_delete(self):
423 423 for d, name in self.dirs_nbs:
424 424 print('%r, %r' % (d, name))
425 425 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
426 426 self.assertEqual(resp.status_code, 204)
427 427
428 428 for d in self.dirs + ['/']:
429 429 nbs = notebooks_only(self.api.list(d).json())
430 430 print('------')
431 431 print(d)
432 432 print(nbs)
433 433 self.assertEqual(nbs, [])
434 434
435 435 def test_delete_dirs(self):
436 436 # depth-first delete everything, so we don't try to delete empty directories
437 437 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
438 438 listing = self.api.list(name).json()['content']
439 439 for model in listing:
440 440 self.api.delete(model['path'])
441 441 listing = self.api.list('/').json()['content']
442 442 self.assertEqual(listing, [])
443 443
444 444 def test_delete_non_empty_dir(self):
445 445 """delete non-empty dir raises 400"""
446 446 with assert_http_error(400):
447 447 self.api.delete(u'Γ₯ b')
448 448
449 449 def test_rename(self):
450 450 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
451 451 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
452 452 self.assertEqual(resp.json()['name'], 'z.ipynb')
453 453 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
454 454 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
455 455
456 456 nbs = notebooks_only(self.api.list('foo').json())
457 457 nbnames = set(n['name'] for n in nbs)
458 458 self.assertIn('z.ipynb', nbnames)
459 459 self.assertNotIn('a.ipynb', nbnames)
460 460
461 461 def test_rename_existing(self):
462 462 with assert_http_error(409):
463 463 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
464 464
465 465 def test_save(self):
466 466 resp = self.api.read('foo/a.ipynb')
467 467 nbcontent = json.loads(resp.text)['content']
468 468 nb = from_dict(nbcontent)
469 469 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
470 470
471 471 nbmodel= {'content': nb, 'type': 'notebook'}
472 472 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
473 473
474 474 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
475 475 with io.open(nbfile, 'r', encoding='utf-8') as f:
476 476 newnb = read(f, as_version=4)
477 477 self.assertEqual(newnb.cells[0].source,
478 478 u'Created by test Β³')
479 479 nbcontent = self.api.read('foo/a.ipynb').json()['content']
480 480 newnb = from_dict(nbcontent)
481 481 self.assertEqual(newnb.cells[0].source,
482 482 u'Created by test Β³')
483 483
484 484
485 485 def test_checkpoints(self):
486 486 resp = self.api.read('foo/a.ipynb')
487 487 r = self.api.new_checkpoint('foo/a.ipynb')
488 488 self.assertEqual(r.status_code, 201)
489 489 cp1 = r.json()
490 490 self.assertEqual(set(cp1), {'id', 'last_modified'})
491 491 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
492 492
493 493 # Modify it
494 494 nbcontent = json.loads(resp.text)['content']
495 495 nb = from_dict(nbcontent)
496 496 hcell = new_markdown_cell('Created by test')
497 497 nb.cells.append(hcell)
498 498 # Save
499 499 nbmodel= {'content': nb, 'type': 'notebook'}
500 500 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
501 501
502 502 # List checkpoints
503 503 cps = self.api.get_checkpoints('foo/a.ipynb').json()
504 504 self.assertEqual(cps, [cp1])
505 505
506 506 nbcontent = self.api.read('foo/a.ipynb').json()['content']
507 507 nb = from_dict(nbcontent)
508 508 self.assertEqual(nb.cells[0].source, 'Created by test')
509 509
510 510 # Restore cp1
511 511 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
512 512 self.assertEqual(r.status_code, 204)
513 513 nbcontent = self.api.read('foo/a.ipynb').json()['content']
514 514 nb = from_dict(nbcontent)
515 515 self.assertEqual(nb.cells, [])
516 516
517 517 # Delete cp1
518 518 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
519 519 self.assertEqual(r.status_code, 204)
520 520 cps = self.api.get_checkpoints('foo/a.ipynb').json()
521 521 self.assertEqual(cps, [])
@@ -1,369 +1,369 b''
1 1 # coding: utf-8
2 2 """Tests for the notebook manager."""
3 3 from __future__ import print_function
4 4
5 5 import logging
6 6 import os
7 7
8 8 from tornado.web import HTTPError
9 9 from unittest import TestCase
10 10 from tempfile import NamedTemporaryFile
11 11
12 12 from IPython.nbformat import v4 as nbformat
13 13
14 14 from IPython.utils.tempdir import TemporaryDirectory
15 15 from IPython.utils.traitlets import TraitError
16 16 from IPython.html.utils import url_path_join
17 17 from IPython.testing import decorators as dec
18 18
19 19 from ..filemanager import FileContentsManager
20 20 from ..manager import ContentsManager
21 21
22 22
23 23 class TestFileContentsManager(TestCase):
24 24
25 25 def test_root_dir(self):
26 26 with TemporaryDirectory() as td:
27 27 fm = FileContentsManager(root_dir=td)
28 28 self.assertEqual(fm.root_dir, td)
29 29
30 30 def test_missing_root_dir(self):
31 31 with TemporaryDirectory() as td:
32 32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34 34
35 35 def test_invalid_root_dir(self):
36 36 with NamedTemporaryFile() as tf:
37 37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38 38
39 39 def test_get_os_path(self):
40 40 # full filesystem path should be returned with correct operating system
41 41 # separators.
42 42 with TemporaryDirectory() as td:
43 43 root = td
44 44 fm = FileContentsManager(root_dir=root)
45 45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
46 46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 48 self.assertEqual(path, fs_path)
49 49
50 50 fm = FileContentsManager(root_dir=root)
51 51 path = fm._get_os_path('test.ipynb')
52 52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 53 self.assertEqual(path, fs_path)
54 54
55 55 fm = FileContentsManager(root_dir=root)
56 56 path = fm._get_os_path('////test.ipynb')
57 57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 58 self.assertEqual(path, fs_path)
59 59
60 60 def test_checkpoint_subdir(self):
61 61 subd = u'sub βˆ‚ir'
62 62 cp_name = 'test-cp.ipynb'
63 63 with TemporaryDirectory() as td:
64 64 root = td
65 65 os.mkdir(os.path.join(td, subd))
66 66 fm = FileContentsManager(root_dir=root)
67 67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
68 68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
69 69 self.assertNotEqual(cp_dir, cp_subdir)
70 70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72 72
73 73
74 74 class TestContentsManager(TestCase):
75 75
76 76 def setUp(self):
77 77 self._temp_dir = TemporaryDirectory()
78 78 self.td = self._temp_dir.name
79 79 self.contents_manager = FileContentsManager(
80 80 root_dir=self.td,
81 81 log=logging.getLogger()
82 82 )
83 83
84 84 def tearDown(self):
85 85 self._temp_dir.cleanup()
86 86
87 87 def make_dir(self, abs_path, rel_path):
88 88 """make subdirectory, rel_path is the relative path
89 89 to that directory from the location where the server started"""
90 90 os_path = os.path.join(abs_path, rel_path)
91 91 try:
92 92 os.makedirs(os_path)
93 93 except OSError:
94 94 print("Directory already exists: %r" % os_path)
95 95 return os_path
96 96
97 97 def add_code_cell(self, nb):
98 98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 100 nb.cells.append(cell)
101 101
102 102 def new_notebook(self):
103 103 cm = self.contents_manager
104 104 model = cm.new_untitled(type='notebook')
105 105 name = model['name']
106 106 path = model['path']
107 107
108 108 full_model = cm.get(path)
109 109 nb = full_model['content']
110 110 self.add_code_cell(nb)
111 111
112 112 cm.save(full_model, path)
113 113 return nb, name, path
114 114
115 115 def test_new_untitled(self):
116 116 cm = self.contents_manager
117 117 # Test in root directory
118 118 model = cm.new_untitled(type='notebook')
119 119 assert isinstance(model, dict)
120 120 self.assertIn('name', model)
121 121 self.assertIn('path', model)
122 122 self.assertIn('type', model)
123 123 self.assertEqual(model['type'], 'notebook')
124 124 self.assertEqual(model['name'], 'Untitled.ipynb')
125 125 self.assertEqual(model['path'], 'Untitled.ipynb')
126 126
127 127 # Test in sub-directory
128 128 model = cm.new_untitled(type='directory')
129 129 assert isinstance(model, dict)
130 130 self.assertIn('name', model)
131 131 self.assertIn('path', model)
132 132 self.assertIn('type', model)
133 133 self.assertEqual(model['type'], 'directory')
134 134 self.assertEqual(model['name'], 'Untitled Folder')
135 135 self.assertEqual(model['path'], 'Untitled Folder')
136 136 sub_dir = model['path']
137 137
138 138 model = cm.new_untitled(path=sub_dir)
139 139 assert isinstance(model, dict)
140 140 self.assertIn('name', model)
141 141 self.assertIn('path', model)
142 142 self.assertIn('type', model)
143 143 self.assertEqual(model['type'], 'file')
144 144 self.assertEqual(model['name'], 'untitled')
145 145 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
146 146
147 147 def test_get(self):
148 148 cm = self.contents_manager
149 149 # Create a notebook
150 150 model = cm.new_untitled(type='notebook')
151 151 name = model['name']
152 152 path = model['path']
153 153
154 154 # Check that we 'get' on the notebook we just created
155 155 model2 = cm.get(path)
156 156 assert isinstance(model2, dict)
157 157 self.assertIn('name', model2)
158 158 self.assertIn('path', model2)
159 159 self.assertEqual(model['name'], name)
160 160 self.assertEqual(model['path'], path)
161 161
162 nb_as_file = cm.get(path, content=True, type_='file')
162 nb_as_file = cm.get(path, content=True, type='file')
163 163 self.assertEqual(nb_as_file['path'], path)
164 164 self.assertEqual(nb_as_file['type'], 'file')
165 165 self.assertEqual(nb_as_file['format'], 'text')
166 166 self.assertNotIsInstance(nb_as_file['content'], dict)
167 167
168 nb_as_bin_file = cm.get(path, content=True, type_='file', format='base64')
168 nb_as_bin_file = cm.get(path, content=True, type='file', format='base64')
169 169 self.assertEqual(nb_as_bin_file['format'], 'base64')
170 170
171 171 # Test in sub-directory
172 172 sub_dir = '/foo/'
173 173 self.make_dir(cm.root_dir, 'foo')
174 174 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
175 175 model2 = cm.get(sub_dir + name)
176 176 assert isinstance(model2, dict)
177 177 self.assertIn('name', model2)
178 178 self.assertIn('path', model2)
179 179 self.assertIn('content', model2)
180 180 self.assertEqual(model2['name'], 'Untitled.ipynb')
181 181 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
182 182
183 183 # Test getting directory model
184 184 dirmodel = cm.get('foo')
185 185 self.assertEqual(dirmodel['type'], 'directory')
186 186
187 187 with self.assertRaises(HTTPError):
188 cm.get('foo', type_='file')
188 cm.get('foo', type='file')
189 189
190 190
191 191 @dec.skip_win32
192 192 def test_bad_symlink(self):
193 193 cm = self.contents_manager
194 194 path = 'test bad symlink'
195 195 os_path = self.make_dir(cm.root_dir, path)
196 196
197 197 file_model = cm.new_untitled(path=path, ext='.txt')
198 198
199 199 # create a broken symlink
200 200 os.symlink("target", os.path.join(os_path, "bad symlink"))
201 201 model = cm.get(path)
202 202 self.assertEqual(model['content'], [file_model])
203 203
204 204 @dec.skip_win32
205 205 def test_good_symlink(self):
206 206 cm = self.contents_manager
207 207 parent = 'test good symlink'
208 208 name = 'good symlink'
209 209 path = '{0}/{1}'.format(parent, name)
210 210 os_path = self.make_dir(cm.root_dir, parent)
211 211
212 212 file_model = cm.new(path=parent + '/zfoo.txt')
213 213
214 214 # create a good symlink
215 215 os.symlink(file_model['name'], os.path.join(os_path, name))
216 216 symlink_model = cm.get(path, content=False)
217 217 dir_model = cm.get(parent)
218 218 self.assertEqual(
219 219 sorted(dir_model['content'], key=lambda x: x['name']),
220 220 [symlink_model, file_model],
221 221 )
222 222
223 223 def test_update(self):
224 224 cm = self.contents_manager
225 225 # Create a notebook
226 226 model = cm.new_untitled(type='notebook')
227 227 name = model['name']
228 228 path = model['path']
229 229
230 230 # Change the name in the model for rename
231 231 model['path'] = 'test.ipynb'
232 232 model = cm.update(model, path)
233 233 assert isinstance(model, dict)
234 234 self.assertIn('name', model)
235 235 self.assertIn('path', model)
236 236 self.assertEqual(model['name'], 'test.ipynb')
237 237
238 238 # Make sure the old name is gone
239 239 self.assertRaises(HTTPError, cm.get, path)
240 240
241 241 # Test in sub-directory
242 242 # Create a directory and notebook in that directory
243 243 sub_dir = '/foo/'
244 244 self.make_dir(cm.root_dir, 'foo')
245 245 model = cm.new_untitled(path=sub_dir, type='notebook')
246 246 name = model['name']
247 247 path = model['path']
248 248
249 249 # Change the name in the model for rename
250 250 d = path.rsplit('/', 1)[0]
251 251 new_path = model['path'] = d + '/test_in_sub.ipynb'
252 252 model = cm.update(model, path)
253 253 assert isinstance(model, dict)
254 254 self.assertIn('name', model)
255 255 self.assertIn('path', model)
256 256 self.assertEqual(model['name'], 'test_in_sub.ipynb')
257 257 self.assertEqual(model['path'], new_path)
258 258
259 259 # Make sure the old name is gone
260 260 self.assertRaises(HTTPError, cm.get, path)
261 261
262 262 def test_save(self):
263 263 cm = self.contents_manager
264 264 # Create a notebook
265 265 model = cm.new_untitled(type='notebook')
266 266 name = model['name']
267 267 path = model['path']
268 268
269 269 # Get the model with 'content'
270 270 full_model = cm.get(path)
271 271
272 272 # Save the notebook
273 273 model = cm.save(full_model, path)
274 274 assert isinstance(model, dict)
275 275 self.assertIn('name', model)
276 276 self.assertIn('path', model)
277 277 self.assertEqual(model['name'], name)
278 278 self.assertEqual(model['path'], path)
279 279
280 280 # Test in sub-directory
281 281 # Create a directory and notebook in that directory
282 282 sub_dir = '/foo/'
283 283 self.make_dir(cm.root_dir, 'foo')
284 284 model = cm.new_untitled(path=sub_dir, type='notebook')
285 285 name = model['name']
286 286 path = model['path']
287 287 model = cm.get(path)
288 288
289 289 # Change the name in the model for rename
290 290 model = cm.save(model, path)
291 291 assert isinstance(model, dict)
292 292 self.assertIn('name', model)
293 293 self.assertIn('path', model)
294 294 self.assertEqual(model['name'], 'Untitled.ipynb')
295 295 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
296 296
297 297 def test_delete(self):
298 298 cm = self.contents_manager
299 299 # Create a notebook
300 300 nb, name, path = self.new_notebook()
301 301
302 302 # Delete the notebook
303 303 cm.delete(path)
304 304
305 305 # Check that a 'get' on the deleted notebook raises and error
306 306 self.assertRaises(HTTPError, cm.get, path)
307 307
308 308 def test_copy(self):
309 309 cm = self.contents_manager
310 310 parent = u'Γ₯ b'
311 311 name = u'nb √.ipynb'
312 312 path = u'{0}/{1}'.format(parent, name)
313 313 os.mkdir(os.path.join(cm.root_dir, parent))
314 314 orig = cm.new(path=path)
315 315
316 316 # copy with unspecified name
317 317 copy = cm.copy(path)
318 318 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
319 319
320 320 # copy with specified name
321 321 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
322 322 self.assertEqual(copy2['name'], u'copy 2.ipynb')
323 323 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
324 324 # copy with specified path
325 325 copy2 = cm.copy(path, u'/')
326 326 self.assertEqual(copy2['name'], name)
327 327 self.assertEqual(copy2['path'], name)
328 328
329 329 def test_trust_notebook(self):
330 330 cm = self.contents_manager
331 331 nb, name, path = self.new_notebook()
332 332
333 333 untrusted = cm.get(path)['content']
334 334 assert not cm.notary.check_cells(untrusted)
335 335
336 336 # print(untrusted)
337 337 cm.trust_notebook(path)
338 338 trusted = cm.get(path)['content']
339 339 # print(trusted)
340 340 assert cm.notary.check_cells(trusted)
341 341
342 342 def test_mark_trusted_cells(self):
343 343 cm = self.contents_manager
344 344 nb, name, path = self.new_notebook()
345 345
346 346 cm.mark_trusted_cells(nb, path)
347 347 for cell in nb.cells:
348 348 if cell.cell_type == 'code':
349 349 assert not cell.metadata.trusted
350 350
351 351 cm.trust_notebook(path)
352 352 nb = cm.get(path)['content']
353 353 for cell in nb.cells:
354 354 if cell.cell_type == 'code':
355 355 assert cell.metadata.trusted
356 356
357 357 def test_check_and_sign(self):
358 358 cm = self.contents_manager
359 359 nb, name, path = self.new_notebook()
360 360
361 361 cm.mark_trusted_cells(nb, path)
362 362 cm.check_and_sign(nb, path)
363 363 assert not cm.notary.check_signature(nb)
364 364
365 365 cm.trust_notebook(path)
366 366 nb = cm.get(path)['content']
367 367 cm.mark_trusted_cells(nb, path)
368 368 cm.check_and_sign(nb, path)
369 369 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now