##// END OF EJS Templates
Allow specifying format when getting files from contents API
Thomas Kluyver -
Show More
@@ -1,545 +1,559 b''
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 import io
8 8 import os
9 9 import glob
10 10 import shutil
11 11
12 12 from tornado import web
13 13
14 14 from .manager import ContentsManager
15 15 from IPython import nbformat
16 16 from IPython.utils.io import atomic_writing
17 17 from IPython.utils.path import ensure_dir_exists
18 18 from IPython.utils.traitlets import Unicode, Bool, TraitError
19 19 from IPython.utils.py3compat import getcwd
20 20 from IPython.utils import tz
21 21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
22 22
23 23
24 24 class FileContentsManager(ContentsManager):
25 25
26 26 root_dir = Unicode(getcwd(), config=True)
27 27
28 28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
29 29 def _save_script_changed(self):
30 30 self.log.warn("""
31 31 Automatically saving notebooks as scripts has been removed.
32 32 Use `ipython nbconvert --to python [notebook]` instead.
33 33 """)
34 34
35 35 def _root_dir_changed(self, name, old, new):
36 36 """Do a bit of validation of the root_dir."""
37 37 if not os.path.isabs(new):
38 38 # If we receive a non-absolute path, make it absolute.
39 39 self.root_dir = os.path.abspath(new)
40 40 return
41 41 if not os.path.isdir(new):
42 42 raise TraitError("%r is not a directory" % new)
43 43
44 44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
45 45 help="""The directory name in which to keep file checkpoints
46 46
47 47 This is a path relative to the file's own directory.
48 48
49 49 By default, it is .ipynb_checkpoints
50 50 """
51 51 )
52 52
53 53 def _copy(self, src, dest):
54 54 """copy src to dest
55 55
56 56 like shutil.copy2, but log errors in copystat
57 57 """
58 58 shutil.copyfile(src, dest)
59 59 try:
60 60 shutil.copystat(src, dest)
61 61 except OSError as e:
62 62 self.log.debug("copystat on %s failed", dest, exc_info=True)
63 63
64 64 def _get_os_path(self, path):
65 65 """Given an API path, return its file system path.
66 66
67 67 Parameters
68 68 ----------
69 69 path : string
70 70 The relative API path to the named file.
71 71
72 72 Returns
73 73 -------
74 74 path : string
75 75 Native, absolute OS path to for a file.
76 76 """
77 77 return to_os_path(path, self.root_dir)
78 78
79 79 def dir_exists(self, path):
80 80 """Does the API-style path refer to an extant directory?
81 81
82 82 API-style wrapper for os.path.isdir
83 83
84 84 Parameters
85 85 ----------
86 86 path : string
87 87 The path to check. This is an API path (`/` separated,
88 88 relative to root_dir).
89 89
90 90 Returns
91 91 -------
92 92 exists : bool
93 93 Whether the path is indeed a directory.
94 94 """
95 95 path = path.strip('/')
96 96 os_path = self._get_os_path(path=path)
97 97 return os.path.isdir(os_path)
98 98
99 99 def is_hidden(self, path):
100 100 """Does the API style path correspond to a hidden directory or file?
101 101
102 102 Parameters
103 103 ----------
104 104 path : string
105 105 The path to check. This is an API path (`/` separated,
106 106 relative to root_dir).
107 107
108 108 Returns
109 109 -------
110 110 hidden : bool
111 111 Whether the path exists and is hidden.
112 112 """
113 113 path = path.strip('/')
114 114 os_path = self._get_os_path(path=path)
115 115 return is_hidden(os_path, self.root_dir)
116 116
117 117 def file_exists(self, path):
118 118 """Returns True if the file exists, else returns False.
119 119
120 120 API-style wrapper for os.path.isfile
121 121
122 122 Parameters
123 123 ----------
124 124 path : string
125 125 The relative path to the file (with '/' as separator)
126 126
127 127 Returns
128 128 -------
129 129 exists : bool
130 130 Whether the file exists.
131 131 """
132 132 path = path.strip('/')
133 133 os_path = self._get_os_path(path)
134 134 return os.path.isfile(os_path)
135 135
136 136 def exists(self, path):
137 137 """Returns True if the path exists, else returns False.
138 138
139 139 API-style wrapper for os.path.exists
140 140
141 141 Parameters
142 142 ----------
143 143 path : string
144 144 The API path to the file (with '/' as separator)
145 145
146 146 Returns
147 147 -------
148 148 exists : bool
149 149 Whether the target exists.
150 150 """
151 151 path = path.strip('/')
152 152 os_path = self._get_os_path(path=path)
153 153 return os.path.exists(os_path)
154 154
155 155 def _base_model(self, path):
156 156 """Build the common base of a contents model"""
157 157 os_path = self._get_os_path(path)
158 158 info = os.stat(os_path)
159 159 last_modified = tz.utcfromtimestamp(info.st_mtime)
160 160 created = tz.utcfromtimestamp(info.st_ctime)
161 161 # Create the base model.
162 162 model = {}
163 163 model['name'] = path.rsplit('/', 1)[-1]
164 164 model['path'] = path
165 165 model['last_modified'] = last_modified
166 166 model['created'] = created
167 167 model['content'] = None
168 168 model['format'] = None
169 169 return model
170 170
171 171 def _dir_model(self, path, content=True):
172 172 """Build a model for a directory
173 173
174 174 if content is requested, will include a listing of the directory
175 175 """
176 176 os_path = self._get_os_path(path)
177 177
178 178 four_o_four = u'directory does not exist: %r' % os_path
179 179
180 180 if not os.path.isdir(os_path):
181 181 raise web.HTTPError(404, four_o_four)
182 182 elif is_hidden(os_path, self.root_dir):
183 183 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
184 184 os_path
185 185 )
186 186 raise web.HTTPError(404, four_o_four)
187 187
188 188 model = self._base_model(path)
189 189 model['type'] = 'directory'
190 190 if content:
191 191 model['content'] = contents = []
192 192 os_dir = self._get_os_path(path)
193 193 for name in os.listdir(os_dir):
194 194 os_path = os.path.join(os_dir, name)
195 195 # skip over broken symlinks in listing
196 196 if not os.path.exists(os_path):
197 197 self.log.warn("%s doesn't exist", os_path)
198 198 continue
199 199 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
200 200 self.log.debug("%s not a regular file", os_path)
201 201 continue
202 202 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
203 203 contents.append(self.get_model(
204 204 path='%s/%s' % (path, name),
205 205 content=False)
206 206 )
207 207
208 208 model['format'] = 'json'
209 209
210 210 return model
211 211
212 def _file_model(self, path, content=True):
212 def _file_model(self, path, content=True, format=None):
213 213 """Build a model for a file
214 214
215 215 if content is requested, include the file contents.
216 UTF-8 text files will be unicode, binary files will be base64-encoded.
216
217 format:
218 If 'text', the contents will be decoded as UTF-8.
219 If 'base64', the raw bytes contents will be encoded as base64.
220 If not specified, try to decode as UTF-8, and fall back to base64
217 221 """
218 222 model = self._base_model(path)
219 223 model['type'] = 'file'
220 224 if content:
221 225 os_path = self._get_os_path(path)
222 226 if not os.path.isfile(os_path):
223 227 # could be FIFO
224 228 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
225 229 with io.open(os_path, 'rb') as f:
226 230 bcontent = f.read()
231
232 if format != 'base64':
227 233 try:
228 234 model['content'] = bcontent.decode('utf8')
229 235 except UnicodeError as e:
230 model['content'] = base64.encodestring(bcontent).decode('ascii')
231 model['format'] = 'base64'
236 if format == 'text':
237 raise web.HTTPError(400, "%s is not UTF-8 encoded" % path)
232 238 else:
233 239 model['format'] = 'text'
240
241 if model['content'] is None:
242 model['content'] = base64.encodestring(bcontent).decode('ascii')
243 model['format'] = 'base64'
244
234 245 return model
235 246
236 247
237 248 def _notebook_model(self, path, content=True):
238 249 """Build a notebook model
239 250
240 251 if content is requested, the notebook content will be populated
241 252 as a JSON structure (not double-serialized)
242 253 """
243 254 model = self._base_model(path)
244 255 model['type'] = 'notebook'
245 256 if content:
246 257 os_path = self._get_os_path(path)
247 258 with io.open(os_path, 'r', encoding='utf-8') as f:
248 259 try:
249 260 nb = nbformat.read(f, as_version=4)
250 261 except Exception as e:
251 262 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
252 263 self.mark_trusted_cells(nb, path)
253 264 model['content'] = nb
254 265 model['format'] = 'json'
255 266 self.validate_notebook_model(model)
256 267 return model
257 268
258 def get_model(self, path, content=True, type_=None):
269 def get_model(self, path, content=True, type_=None, format=None):
259 270 """ Takes a path for an entity and returns its model
260 271
261 272 Parameters
262 273 ----------
263 274 path : str
264 275 the API path that describes the relative path for the target
265 276 content : bool
266 277 Whether to include the contents in the reply
267 278 type_ : str, optional
268 279 The requested type - 'file', 'notebook', or 'directory'.
269 280 Will raise HTTPError 406 if the content doesn't match.
281 format : str, optional
282 The requested format for file contents. 'text' or 'base64'.
283 Ignored if this returns a notebook or directory model.
270 284
271 285 Returns
272 286 -------
273 287 model : dict
274 288 the contents model. If content=True, returns the contents
275 289 of the file or directory as well.
276 290 """
277 291 path = path.strip('/')
278 292
279 293 if not self.exists(path):
280 294 raise web.HTTPError(404, u'No such file or directory: %s' % path)
281 295
282 296 os_path = self._get_os_path(path)
283 297 if os.path.isdir(os_path):
284 298 if type_ not in (None, 'directory'):
285 299 raise web.HTTPError(400,
286 300 u'%s is a directory, not a %s' % (path, type_))
287 301 model = self._dir_model(path, content=content)
288 302 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
289 303 model = self._notebook_model(path, content=content)
290 304 else:
291 305 if type_ == 'directory':
292 306 raise web.HTTPError(400,
293 307 u'%s is not a directory')
294 model = self._file_model(path, content=content)
308 model = self._file_model(path, content=content, format=format)
295 309 return model
296 310
297 311 def _save_notebook(self, os_path, model, path=''):
298 312 """save a notebook file"""
299 313 # Save the notebook file
300 314 nb = nbformat.from_dict(model['content'])
301 315
302 316 self.check_and_sign(nb, path)
303 317
304 318 with atomic_writing(os_path, encoding='utf-8') as f:
305 319 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
306 320
307 321 def _save_file(self, os_path, model, path=''):
308 322 """save a non-notebook file"""
309 323 fmt = model.get('format', None)
310 324 if fmt not in {'text', 'base64'}:
311 325 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
312 326 try:
313 327 content = model['content']
314 328 if fmt == 'text':
315 329 bcontent = content.encode('utf8')
316 330 else:
317 331 b64_bytes = content.encode('ascii')
318 332 bcontent = base64.decodestring(b64_bytes)
319 333 except Exception as e:
320 334 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
321 335 with atomic_writing(os_path, text=False) as f:
322 336 f.write(bcontent)
323 337
324 338 def _save_directory(self, os_path, model, path=''):
325 339 """create a directory"""
326 340 if is_hidden(os_path, self.root_dir):
327 341 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
328 342 if not os.path.exists(os_path):
329 343 os.mkdir(os_path)
330 344 elif not os.path.isdir(os_path):
331 345 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
332 346 else:
333 347 self.log.debug("Directory %r already exists", os_path)
334 348
335 349 def save(self, model, path=''):
336 350 """Save the file model and return the model with no content."""
337 351 path = path.strip('/')
338 352
339 353 if 'type' not in model:
340 354 raise web.HTTPError(400, u'No file type provided')
341 355 if 'content' not in model and model['type'] != 'directory':
342 356 raise web.HTTPError(400, u'No file content provided')
343 357
344 358 # One checkpoint should always exist
345 359 if self.file_exists(path) and not self.list_checkpoints(path):
346 360 self.create_checkpoint(path)
347 361
348 362 os_path = self._get_os_path(path)
349 363 self.log.debug("Saving %s", os_path)
350 364 try:
351 365 if model['type'] == 'notebook':
352 366 self._save_notebook(os_path, model, path)
353 367 elif model['type'] == 'file':
354 368 self._save_file(os_path, model, path)
355 369 elif model['type'] == 'directory':
356 370 self._save_directory(os_path, model, path)
357 371 else:
358 372 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
359 373 except web.HTTPError:
360 374 raise
361 375 except Exception as e:
362 376 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
363 377
364 378 validation_message = None
365 379 if model['type'] == 'notebook':
366 380 self.validate_notebook_model(model)
367 381 validation_message = model.get('message', None)
368 382
369 383 model = self.get_model(path, content=False)
370 384 if validation_message:
371 385 model['message'] = validation_message
372 386 return model
373 387
374 388 def update(self, model, path):
375 389 """Update the file's path
376 390
377 391 For use in PATCH requests, to enable renaming a file without
378 392 re-uploading its contents. Only used for renaming at the moment.
379 393 """
380 394 path = path.strip('/')
381 395 new_path = model.get('path', path).strip('/')
382 396 if path != new_path:
383 397 self.rename(path, new_path)
384 398 model = self.get_model(new_path, content=False)
385 399 return model
386 400
387 401 def delete(self, path):
388 402 """Delete file at path."""
389 403 path = path.strip('/')
390 404 os_path = self._get_os_path(path)
391 405 rm = os.unlink
392 406 if os.path.isdir(os_path):
393 407 listing = os.listdir(os_path)
394 408 # don't delete non-empty directories (checkpoints dir doesn't count)
395 409 if listing and listing != [self.checkpoint_dir]:
396 410 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
397 411 elif not os.path.isfile(os_path):
398 412 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
399 413
400 414 # clear checkpoints
401 415 for checkpoint in self.list_checkpoints(path):
402 416 checkpoint_id = checkpoint['id']
403 417 cp_path = self.get_checkpoint_path(checkpoint_id, path)
404 418 if os.path.isfile(cp_path):
405 419 self.log.debug("Unlinking checkpoint %s", cp_path)
406 420 os.unlink(cp_path)
407 421
408 422 if os.path.isdir(os_path):
409 423 self.log.debug("Removing directory %s", os_path)
410 424 shutil.rmtree(os_path)
411 425 else:
412 426 self.log.debug("Unlinking file %s", os_path)
413 427 rm(os_path)
414 428
415 429 def rename(self, old_path, new_path):
416 430 """Rename a file."""
417 431 old_path = old_path.strip('/')
418 432 new_path = new_path.strip('/')
419 433 if new_path == old_path:
420 434 return
421 435
422 436 new_os_path = self._get_os_path(new_path)
423 437 old_os_path = self._get_os_path(old_path)
424 438
425 439 # Should we proceed with the move?
426 440 if os.path.exists(new_os_path):
427 441 raise web.HTTPError(409, u'File already exists: %s' % new_path)
428 442
429 443 # Move the file
430 444 try:
431 445 shutil.move(old_os_path, new_os_path)
432 446 except Exception as e:
433 447 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
434 448
435 449 # Move the checkpoints
436 450 old_checkpoints = self.list_checkpoints(old_path)
437 451 for cp in old_checkpoints:
438 452 checkpoint_id = cp['id']
439 453 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
440 454 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
441 455 if os.path.isfile(old_cp_path):
442 456 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
443 457 shutil.move(old_cp_path, new_cp_path)
444 458
445 459 # Checkpoint-related utilities
446 460
447 461 def get_checkpoint_path(self, checkpoint_id, path):
448 462 """find the path to a checkpoint"""
449 463 path = path.strip('/')
450 464 parent, name = ('/' + path).rsplit('/', 1)
451 465 parent = parent.strip('/')
452 466 basename, ext = os.path.splitext(name)
453 467 filename = u"{name}-{checkpoint_id}{ext}".format(
454 468 name=basename,
455 469 checkpoint_id=checkpoint_id,
456 470 ext=ext,
457 471 )
458 472 os_path = self._get_os_path(path=parent)
459 473 cp_dir = os.path.join(os_path, self.checkpoint_dir)
460 474 ensure_dir_exists(cp_dir)
461 475 cp_path = os.path.join(cp_dir, filename)
462 476 return cp_path
463 477
464 478 def get_checkpoint_model(self, checkpoint_id, path):
465 479 """construct the info dict for a given checkpoint"""
466 480 path = path.strip('/')
467 481 cp_path = self.get_checkpoint_path(checkpoint_id, path)
468 482 stats = os.stat(cp_path)
469 483 last_modified = tz.utcfromtimestamp(stats.st_mtime)
470 484 info = dict(
471 485 id = checkpoint_id,
472 486 last_modified = last_modified,
473 487 )
474 488 return info
475 489
476 490 # public checkpoint API
477 491
478 492 def create_checkpoint(self, path):
479 493 """Create a checkpoint from the current state of a file"""
480 494 path = path.strip('/')
481 495 if not self.file_exists(path):
482 496 raise web.HTTPError(404)
483 497 src_path = self._get_os_path(path)
484 498 # only the one checkpoint ID:
485 499 checkpoint_id = u"checkpoint"
486 500 cp_path = self.get_checkpoint_path(checkpoint_id, path)
487 501 self.log.debug("creating checkpoint for %s", path)
488 502 self._copy(src_path, cp_path)
489 503
490 504 # return the checkpoint info
491 505 return self.get_checkpoint_model(checkpoint_id, path)
492 506
493 507 def list_checkpoints(self, path):
494 508 """list the checkpoints for a given file
495 509
496 510 This contents manager currently only supports one checkpoint per file.
497 511 """
498 512 path = path.strip('/')
499 513 checkpoint_id = "checkpoint"
500 514 os_path = self.get_checkpoint_path(checkpoint_id, path)
501 515 if not os.path.exists(os_path):
502 516 return []
503 517 else:
504 518 return [self.get_checkpoint_model(checkpoint_id, path)]
505 519
506 520
507 521 def restore_checkpoint(self, checkpoint_id, path):
508 522 """restore a file to a checkpointed state"""
509 523 path = path.strip('/')
510 524 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
511 525 nb_path = self._get_os_path(path)
512 526 cp_path = self.get_checkpoint_path(checkpoint_id, path)
513 527 if not os.path.isfile(cp_path):
514 528 self.log.debug("checkpoint file does not exist: %s", cp_path)
515 529 raise web.HTTPError(404,
516 530 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
517 531 )
518 532 # ensure notebook is readable (never restore from an unreadable notebook)
519 533 if cp_path.endswith('.ipynb'):
520 534 with io.open(cp_path, 'r', encoding='utf-8') as f:
521 535 nbformat.read(f, as_version=4)
522 536 self._copy(cp_path, nb_path)
523 537 self.log.debug("copying %s -> %s", cp_path, nb_path)
524 538
525 539 def delete_checkpoint(self, checkpoint_id, path):
526 540 """delete a file's checkpoint"""
527 541 path = path.strip('/')
528 542 cp_path = self.get_checkpoint_path(checkpoint_id, path)
529 543 if not os.path.isfile(cp_path):
530 544 raise web.HTTPError(404,
531 545 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
532 546 )
533 547 self.log.debug("unlinking %s", cp_path)
534 548 os.unlink(cp_path)
535 549
536 550 def info_string(self):
537 551 return "Serving notebooks from local directory: %s" % self.root_dir
538 552
539 553 def get_kernel_path(self, path, model=None):
540 554 """Return the initial working dir a kernel associated with a given notebook"""
541 555 if '/' in path:
542 556 parent_dir = path.rsplit('/', 1)[0]
543 557 else:
544 558 parent_dir = ''
545 559 return self._get_os_path(parent_dir)
@@ -1,261 +1,265 b''
1 1 """Tornado handlers for the contents web service."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 7
8 8 from tornado import web
9 9
10 10 from IPython.html.utils import url_path_join, url_escape
11 11 from IPython.utils.jsonutil import date_default
12 12
13 13 from IPython.html.base.handlers import (
14 14 IPythonHandler, json_errors, path_regex,
15 15 )
16 16
17 17
18 18 def sort_key(model):
19 19 """key function for case-insensitive sort by name and type"""
20 20 iname = model['name'].lower()
21 21 type_key = {
22 22 'directory' : '0',
23 23 'notebook' : '1',
24 24 'file' : '2',
25 25 }.get(model['type'], '9')
26 26 return u'%s%s' % (type_key, iname)
27 27
28 28 class ContentsHandler(IPythonHandler):
29 29
30 30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
31 31
32 32 def location_url(self, path):
33 33 """Return the full URL location of a file.
34 34
35 35 Parameters
36 36 ----------
37 37 path : unicode
38 38 The API path of the file, such as "foo/bar.txt".
39 39 """
40 40 return url_escape(url_path_join(
41 41 self.base_url, 'api', 'contents', path
42 42 ))
43 43
44 44 def _finish_model(self, model, location=True):
45 45 """Finish a JSON request with a model, setting relevant headers, etc."""
46 46 if location:
47 47 location = self.location_url(model['path'])
48 48 self.set_header('Location', location)
49 49 self.set_header('Last-Modified', model['last_modified'])
50 50 self.finish(json.dumps(model, default=date_default))
51 51
52 52 @web.authenticated
53 53 @json_errors
54 54 def get(self, path=''):
55 55 """Return a model for a file or directory.
56 56
57 57 A directory model contains a list of models (without content)
58 58 of the files and directories it contains.
59 59 """
60 60 path = path or ''
61 61 type_ = self.get_query_argument('type', default=None)
62 62 if type_ not in {None, 'directory', 'file', 'notebook'}:
63 63 raise web.HTTPError(400, u'Type %r is invalid' % type_)
64 64
65 model = self.contents_manager.get_model(path=path, type_=type_)
65 format = self.get_query_argument('format', default=None)#
66 if format not in {None, 'text', 'base64'}:
67 raise web.HTTPError(400, u'Format %r is invalid' % format)
68
69 model = self.contents_manager.get_model(path=path, type_=type_, format=format)
66 70 if model['type'] == 'directory':
67 71 # group listing by type, then by name (case-insensitive)
68 72 # FIXME: sorting should be done in the frontends
69 73 model['content'].sort(key=sort_key)
70 74 self._finish_model(model, location=False)
71 75
72 76 @web.authenticated
73 77 @json_errors
74 78 def patch(self, path=''):
75 79 """PATCH renames a file or directory without re-uploading content."""
76 80 cm = self.contents_manager
77 81 model = self.get_json_body()
78 82 if model is None:
79 83 raise web.HTTPError(400, u'JSON body missing')
80 84 model = cm.update(model, path)
81 85 self._finish_model(model)
82 86
83 87 def _copy(self, copy_from, copy_to=None):
84 88 """Copy a file, optionally specifying a target directory."""
85 89 self.log.info(u"Copying {copy_from} to {copy_to}".format(
86 90 copy_from=copy_from,
87 91 copy_to=copy_to or '',
88 92 ))
89 93 model = self.contents_manager.copy(copy_from, copy_to)
90 94 self.set_status(201)
91 95 self._finish_model(model)
92 96
93 97 def _upload(self, model, path):
94 98 """Handle upload of a new file to path"""
95 99 self.log.info(u"Uploading file to %s", path)
96 100 model = self.contents_manager.new(model, path)
97 101 self.set_status(201)
98 102 self._finish_model(model)
99 103
100 104 def _new_untitled(self, path, type='', ext=''):
101 105 """Create a new, empty untitled entity"""
102 106 self.log.info(u"Creating new %s in %s", type or 'file', path)
103 107 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
104 108 self.set_status(201)
105 109 self._finish_model(model)
106 110
107 111 def _save(self, model, path):
108 112 """Save an existing file."""
109 113 self.log.info(u"Saving file at %s", path)
110 114 model = self.contents_manager.save(model, path)
111 115 self._finish_model(model)
112 116
113 117 @web.authenticated
114 118 @json_errors
115 119 def post(self, path=''):
116 120 """Create a new file in the specified path.
117 121
118 122 POST creates new files. The server always decides on the name.
119 123
120 124 POST /api/contents/path
121 125 New untitled, empty file or directory.
122 126 POST /api/contents/path
123 127 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
124 128 New copy of OtherNotebook in path
125 129 """
126 130
127 131 cm = self.contents_manager
128 132
129 133 if cm.file_exists(path):
130 134 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
131 135
132 136 if not cm.dir_exists(path):
133 137 raise web.HTTPError(404, "No such directory: %s" % path)
134 138
135 139 model = self.get_json_body()
136 140
137 141 if model is not None:
138 142 copy_from = model.get('copy_from')
139 143 ext = model.get('ext', '')
140 144 type = model.get('type', '')
141 145 if copy_from:
142 146 self._copy(copy_from, path)
143 147 else:
144 148 self._new_untitled(path, type=type, ext=ext)
145 149 else:
146 150 self._new_untitled(path)
147 151
148 152 @web.authenticated
149 153 @json_errors
150 154 def put(self, path=''):
151 155 """Saves the file in the location specified by name and path.
152 156
153 157 PUT is very similar to POST, but the requester specifies the name,
154 158 whereas with POST, the server picks the name.
155 159
156 160 PUT /api/contents/path/Name.ipynb
157 161 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
158 162 in `content` key of JSON request body. If content is not specified,
159 163 create a new empty notebook.
160 164 """
161 165 model = self.get_json_body()
162 166 if model:
163 167 if model.get('copy_from'):
164 168 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
165 169 if self.contents_manager.file_exists(path):
166 170 self._save(model, path)
167 171 else:
168 172 self._upload(model, path)
169 173 else:
170 174 self._new_untitled(path)
171 175
172 176 @web.authenticated
173 177 @json_errors
174 178 def delete(self, path=''):
175 179 """delete a file in the given path"""
176 180 cm = self.contents_manager
177 181 self.log.warn('delete %s', path)
178 182 cm.delete(path)
179 183 self.set_status(204)
180 184 self.finish()
181 185
182 186
183 187 class CheckpointsHandler(IPythonHandler):
184 188
185 189 SUPPORTED_METHODS = ('GET', 'POST')
186 190
187 191 @web.authenticated
188 192 @json_errors
189 193 def get(self, path=''):
190 194 """get lists checkpoints for a file"""
191 195 cm = self.contents_manager
192 196 checkpoints = cm.list_checkpoints(path)
193 197 data = json.dumps(checkpoints, default=date_default)
194 198 self.finish(data)
195 199
196 200 @web.authenticated
197 201 @json_errors
198 202 def post(self, path=''):
199 203 """post creates a new checkpoint"""
200 204 cm = self.contents_manager
201 205 checkpoint = cm.create_checkpoint(path)
202 206 data = json.dumps(checkpoint, default=date_default)
203 207 location = url_path_join(self.base_url, 'api/contents',
204 208 path, 'checkpoints', checkpoint['id'])
205 209 self.set_header('Location', url_escape(location))
206 210 self.set_status(201)
207 211 self.finish(data)
208 212
209 213
210 214 class ModifyCheckpointsHandler(IPythonHandler):
211 215
212 216 SUPPORTED_METHODS = ('POST', 'DELETE')
213 217
214 218 @web.authenticated
215 219 @json_errors
216 220 def post(self, path, checkpoint_id):
217 221 """post restores a file from a checkpoint"""
218 222 cm = self.contents_manager
219 223 cm.restore_checkpoint(checkpoint_id, path)
220 224 self.set_status(204)
221 225 self.finish()
222 226
223 227 @web.authenticated
224 228 @json_errors
225 229 def delete(self, path, checkpoint_id):
226 230 """delete clears a checkpoint for a given file"""
227 231 cm = self.contents_manager
228 232 cm.delete_checkpoint(checkpoint_id, path)
229 233 self.set_status(204)
230 234 self.finish()
231 235
232 236
233 237 class NotebooksRedirectHandler(IPythonHandler):
234 238 """Redirect /api/notebooks to /api/contents"""
235 239 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
236 240
237 241 def get(self, path):
238 242 self.log.warn("/api/notebooks is deprecated, use /api/contents")
239 243 self.redirect(url_path_join(
240 244 self.base_url,
241 245 'api/contents',
242 246 path
243 247 ))
244 248
245 249 put = patch = post = delete = get
246 250
247 251
248 252 #-----------------------------------------------------------------------------
249 253 # URL to handler mappings
250 254 #-----------------------------------------------------------------------------
251 255
252 256
253 257 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
254 258
255 259 default_handlers = [
256 260 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
257 261 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
258 262 ModifyCheckpointsHandler),
259 263 (r"/api/contents%s" % path_regex, ContentsHandler),
260 264 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
261 265 ]
@@ -1,373 +1,373 b''
1 1 """A base class for contents managers."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from fnmatch import fnmatch
7 7 import itertools
8 8 import json
9 9 import os
10 10
11 11 from tornado.web import HTTPError
12 12
13 13 from IPython.config.configurable import LoggingConfigurable
14 14 from IPython.nbformat import sign, validate, ValidationError
15 15 from IPython.nbformat.v4 import new_notebook
16 16 from IPython.utils.traitlets import Instance, Unicode, List
17 17
18 18
19 19 class ContentsManager(LoggingConfigurable):
20 20 """Base class for serving files and directories.
21 21
22 22 This serves any text or binary file,
23 23 as well as directories,
24 24 with special handling for JSON notebook documents.
25 25
26 26 Most APIs take a path argument,
27 27 which is always an API-style unicode path,
28 28 and always refers to a directory.
29 29
30 30 - unicode, not url-escaped
31 31 - '/'-separated
32 32 - leading and trailing '/' will be stripped
33 33 - if unspecified, path defaults to '',
34 34 indicating the root path.
35 35
36 36 """
37 37
38 38 notary = Instance(sign.NotebookNotary)
39 39 def _notary_default(self):
40 40 return sign.NotebookNotary(parent=self)
41 41
42 42 hide_globs = List(Unicode, [
43 43 u'__pycache__', '*.pyc', '*.pyo',
44 44 '.DS_Store', '*.so', '*.dylib', '*~',
45 45 ], config=True, help="""
46 46 Glob patterns to hide in file and directory listings.
47 47 """)
48 48
49 49 untitled_notebook = Unicode("Untitled", config=True,
50 50 help="The base name used when creating untitled notebooks."
51 51 )
52 52
53 53 untitled_file = Unicode("untitled", config=True,
54 54 help="The base name used when creating untitled files."
55 55 )
56 56
57 57 untitled_directory = Unicode("Untitled Folder", config=True,
58 58 help="The base name used when creating untitled directories."
59 59 )
60 60
61 61 # ContentsManager API part 1: methods that must be
62 62 # implemented in subclasses.
63 63
64 64 def dir_exists(self, path):
65 65 """Does the API-style path (directory) actually exist?
66 66
67 67 Like os.path.isdir
68 68
69 69 Override this method in subclasses.
70 70
71 71 Parameters
72 72 ----------
73 73 path : string
74 74 The path to check
75 75
76 76 Returns
77 77 -------
78 78 exists : bool
79 79 Whether the path does indeed exist.
80 80 """
81 81 raise NotImplementedError
82 82
83 83 def is_hidden(self, path):
84 84 """Does the API style path correspond to a hidden directory or file?
85 85
86 86 Parameters
87 87 ----------
88 88 path : string
89 89 The path to check. This is an API path (`/` separated,
90 90 relative to root dir).
91 91
92 92 Returns
93 93 -------
94 94 hidden : bool
95 95 Whether the path is hidden.
96 96
97 97 """
98 98 raise NotImplementedError
99 99
100 100 def file_exists(self, path=''):
101 101 """Does a file exist at the given path?
102 102
103 103 Like os.path.isfile
104 104
105 105 Override this method in subclasses.
106 106
107 107 Parameters
108 108 ----------
109 109 name : string
110 110 The name of the file you are checking.
111 111 path : string
112 112 The relative path to the file's directory (with '/' as separator)
113 113
114 114 Returns
115 115 -------
116 116 exists : bool
117 117 Whether the file exists.
118 118 """
119 119 raise NotImplementedError('must be implemented in a subclass')
120 120
121 121 def exists(self, path):
122 122 """Does a file or directory exist at the given path?
123 123
124 124 Like os.path.exists
125 125
126 126 Parameters
127 127 ----------
128 128 path : string
129 129 The relative path to the file's directory (with '/' as separator)
130 130
131 131 Returns
132 132 -------
133 133 exists : bool
134 134 Whether the target exists.
135 135 """
136 136 return self.file_exists(path) or self.dir_exists(path)
137 137
138 def get_model(self, path, content=True, type_=None):
138 def get_model(self, path, content=True, type_=None, format=None):
139 139 """Get the model of a file or directory with or without content."""
140 140 raise NotImplementedError('must be implemented in a subclass')
141 141
142 142 def save(self, model, path):
143 143 """Save the file or directory and return the model with no content."""
144 144 raise NotImplementedError('must be implemented in a subclass')
145 145
146 146 def update(self, model, path):
147 147 """Update the file or directory and return the model with no content.
148 148
149 149 For use in PATCH requests, to enable renaming a file without
150 150 re-uploading its contents. Only used for renaming at the moment.
151 151 """
152 152 raise NotImplementedError('must be implemented in a subclass')
153 153
154 154 def delete(self, path):
155 155 """Delete file or directory by path."""
156 156 raise NotImplementedError('must be implemented in a subclass')
157 157
158 158 def create_checkpoint(self, path):
159 159 """Create a checkpoint of the current state of a file
160 160
161 161 Returns a checkpoint_id for the new checkpoint.
162 162 """
163 163 raise NotImplementedError("must be implemented in a subclass")
164 164
165 165 def list_checkpoints(self, path):
166 166 """Return a list of checkpoints for a given file"""
167 167 return []
168 168
169 169 def restore_checkpoint(self, checkpoint_id, path):
170 170 """Restore a file from one of its checkpoints"""
171 171 raise NotImplementedError("must be implemented in a subclass")
172 172
173 173 def delete_checkpoint(self, checkpoint_id, path):
174 174 """delete a checkpoint for a file"""
175 175 raise NotImplementedError("must be implemented in a subclass")
176 176
177 177 # ContentsManager API part 2: methods that have useable default
178 178 # implementations, but can be overridden in subclasses.
179 179
180 180 def info_string(self):
181 181 return "Serving contents"
182 182
183 183 def get_kernel_path(self, path, model=None):
184 184 """Return the API path for the kernel
185 185
186 186 KernelManagers can turn this value into a filesystem path,
187 187 or ignore it altogether.
188 188 """
189 189 return path
190 190
191 191 def increment_filename(self, filename, path=''):
192 192 """Increment a filename until it is unique.
193 193
194 194 Parameters
195 195 ----------
196 196 filename : unicode
197 197 The name of a file, including extension
198 198 path : unicode
199 199 The API path of the target's directory
200 200
201 201 Returns
202 202 -------
203 203 name : unicode
204 204 A filename that is unique, based on the input filename.
205 205 """
206 206 path = path.strip('/')
207 207 basename, ext = os.path.splitext(filename)
208 208 for i in itertools.count():
209 209 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
210 210 ext=ext)
211 211 if not self.exists(u'{}/{}'.format(path, name)):
212 212 break
213 213 return name
214 214
215 215 def validate_notebook_model(self, model):
216 216 """Add failed-validation message to model"""
217 217 try:
218 218 validate(model['content'])
219 219 except ValidationError as e:
220 220 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
221 221 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
222 222 )
223 223 return model
224 224
225 225 def new_untitled(self, path='', type='', ext=''):
226 226 """Create a new untitled file or directory in path
227 227
228 228 path must be a directory
229 229
230 230 File extension can be specified.
231 231
232 232 Use `new` to create files with a fully specified path (including filename).
233 233 """
234 234 path = path.strip('/')
235 235 if not self.dir_exists(path):
236 236 raise HTTPError(404, 'No such directory: %s' % path)
237 237
238 238 model = {}
239 239 if type:
240 240 model['type'] = type
241 241
242 242 if ext == '.ipynb':
243 243 model.setdefault('type', 'notebook')
244 244 else:
245 245 model.setdefault('type', 'file')
246 246
247 247 if model['type'] == 'directory':
248 248 untitled = self.untitled_directory
249 249 elif model['type'] == 'notebook':
250 250 untitled = self.untitled_notebook
251 251 ext = '.ipynb'
252 252 elif model['type'] == 'file':
253 253 untitled = self.untitled_file
254 254 else:
255 255 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
256 256
257 257 name = self.increment_filename(untitled + ext, path)
258 258 path = u'{0}/{1}'.format(path, name)
259 259 return self.new(model, path)
260 260
261 261 def new(self, model=None, path=''):
262 262 """Create a new file or directory and return its model with no content.
263 263
264 264 To create a new untitled entity in a directory, use `new_untitled`.
265 265 """
266 266 path = path.strip('/')
267 267 if model is None:
268 268 model = {}
269 269
270 270 if path.endswith('.ipynb'):
271 271 model.setdefault('type', 'notebook')
272 272 else:
273 273 model.setdefault('type', 'file')
274 274
275 275 # no content, not a directory, so fill out new-file model
276 276 if 'content' not in model and model['type'] != 'directory':
277 277 if model['type'] == 'notebook':
278 278 model['content'] = new_notebook()
279 279 model['format'] = 'json'
280 280 else:
281 281 model['content'] = ''
282 282 model['type'] = 'file'
283 283 model['format'] = 'text'
284 284
285 285 model = self.save(model, path)
286 286 return model
287 287
288 288 def copy(self, from_path, to_path=None):
289 289 """Copy an existing file and return its new model.
290 290
291 291 If to_path not specified, it will be the parent directory of from_path.
292 292 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
293 293
294 294 from_path must be a full path to a file.
295 295 """
296 296 path = from_path.strip('/')
297 297 if '/' in path:
298 298 from_dir, from_name = path.rsplit('/', 1)
299 299 else:
300 300 from_dir = ''
301 301 from_name = path
302 302
303 303 model = self.get_model(path)
304 304 model.pop('path', None)
305 305 model.pop('name', None)
306 306 if model['type'] == 'directory':
307 307 raise HTTPError(400, "Can't copy directories")
308 308
309 309 if not to_path:
310 310 to_path = from_dir
311 311 if self.dir_exists(to_path):
312 312 base, ext = os.path.splitext(from_name)
313 313 copy_name = u'{0}-Copy{1}'.format(base, ext)
314 314 to_name = self.increment_filename(copy_name, to_path)
315 315 to_path = u'{0}/{1}'.format(to_path, to_name)
316 316
317 317 model = self.save(model, to_path)
318 318 return model
319 319
320 320 def log_info(self):
321 321 self.log.info(self.info_string())
322 322
323 323 def trust_notebook(self, path):
324 324 """Explicitly trust a notebook
325 325
326 326 Parameters
327 327 ----------
328 328 path : string
329 329 The path of a notebook
330 330 """
331 331 model = self.get_model(path)
332 332 nb = model['content']
333 333 self.log.warn("Trusting notebook %s", path)
334 334 self.notary.mark_cells(nb, True)
335 335 self.save(model, path)
336 336
337 337 def check_and_sign(self, nb, path=''):
338 338 """Check for trusted cells, and sign the notebook.
339 339
340 340 Called as a part of saving notebooks.
341 341
342 342 Parameters
343 343 ----------
344 344 nb : dict
345 345 The notebook dict
346 346 path : string
347 347 The notebook's path (for logging)
348 348 """
349 349 if self.notary.check_cells(nb):
350 350 self.notary.sign(nb)
351 351 else:
352 352 self.log.warn("Saving untrusted notebook %s", path)
353 353
354 354 def mark_trusted_cells(self, nb, path=''):
355 355 """Mark cells as trusted if the notebook signature matches.
356 356
357 357 Called as a part of loading notebooks.
358 358
359 359 Parameters
360 360 ----------
361 361 nb : dict
362 362 The notebook object (in current nbformat)
363 363 path : string
364 364 The notebook's path (for logging)
365 365 """
366 366 trusted = self.notary.check_signature(nb)
367 367 if not trusted:
368 368 self.log.warn("Notebook %s is not trusted", path)
369 369 self.notary.mark_cells(nb, trusted)
370 370
371 371 def should_list(self, name):
372 372 """Should this file/directory name be displayed in a listing?"""
373 373 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,504 +1,513 b''
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 import io
6 6 import json
7 7 import os
8 8 import shutil
9 9 from unicodedata import normalize
10 10
11 11 pjoin = os.path.join
12 12
13 13 import requests
14 14
15 15 from IPython.html.utils import url_path_join, url_escape
16 16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 17 from IPython.nbformat import read, write, from_dict
18 18 from IPython.nbformat.v4 import (
19 19 new_notebook, new_markdown_cell,
20 20 )
21 21 from IPython.nbformat import v2
22 22 from IPython.utils import py3compat
23 23 from IPython.utils.data import uniq_stable
24 24
25 25
26 26 def notebooks_only(dir_model):
27 27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28 28
29 29 def dirs_only(dir_model):
30 30 return [x for x in dir_model['content'] if x['type']=='directory']
31 31
32 32
33 33 class API(object):
34 34 """Wrapper for contents API calls."""
35 35 def __init__(self, base_url):
36 36 self.base_url = base_url
37 37
38 38 def _req(self, verb, path, body=None):
39 39 response = requests.request(verb,
40 40 url_path_join(self.base_url, 'api/contents', path),
41 41 data=body,
42 42 )
43 43 response.raise_for_status()
44 44 return response
45 45
46 46 def list(self, path='/'):
47 47 return self._req('GET', path)
48 48
49 def read(self, path, type_=None):
49 def read(self, path, type_=None, format=None):
50 query = []
50 51 if type_ is not None:
51 path += '?type=' + type_
52 query.append('type=' + type_)
53 if format is not None:
54 query.append('format=' + format)
55 if query:
56 path += '?' + '&'.join(query)
52 57 return self._req('GET', path)
53 58
54 59 def create_untitled(self, path='/', ext='.ipynb'):
55 60 body = None
56 61 if ext:
57 62 body = json.dumps({'ext': ext})
58 63 return self._req('POST', path, body)
59 64
60 65 def mkdir_untitled(self, path='/'):
61 66 return self._req('POST', path, json.dumps({'type': 'directory'}))
62 67
63 68 def copy(self, copy_from, path='/'):
64 69 body = json.dumps({'copy_from':copy_from})
65 70 return self._req('POST', path, body)
66 71
67 72 def create(self, path='/'):
68 73 return self._req('PUT', path)
69 74
70 75 def upload(self, path, body):
71 76 return self._req('PUT', path, body)
72 77
73 78 def mkdir_untitled(self, path='/'):
74 79 return self._req('POST', path, json.dumps({'type': 'directory'}))
75 80
76 81 def mkdir(self, path='/'):
77 82 return self._req('PUT', path, json.dumps({'type': 'directory'}))
78 83
79 84 def copy_put(self, copy_from, path='/'):
80 85 body = json.dumps({'copy_from':copy_from})
81 86 return self._req('PUT', path, body)
82 87
83 88 def save(self, path, body):
84 89 return self._req('PUT', path, body)
85 90
86 91 def delete(self, path='/'):
87 92 return self._req('DELETE', path)
88 93
89 94 def rename(self, path, new_path):
90 95 body = json.dumps({'path': new_path})
91 96 return self._req('PATCH', path, body)
92 97
93 98 def get_checkpoints(self, path):
94 99 return self._req('GET', url_path_join(path, 'checkpoints'))
95 100
96 101 def new_checkpoint(self, path):
97 102 return self._req('POST', url_path_join(path, 'checkpoints'))
98 103
99 104 def restore_checkpoint(self, path, checkpoint_id):
100 105 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
101 106
102 107 def delete_checkpoint(self, path, checkpoint_id):
103 108 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
104 109
105 110 class APITest(NotebookTestBase):
106 111 """Test the kernels web service API"""
107 112 dirs_nbs = [('', 'inroot'),
108 113 ('Directory with spaces in', 'inspace'),
109 114 (u'unicodΓ©', 'innonascii'),
110 115 ('foo', 'a'),
111 116 ('foo', 'b'),
112 117 ('foo', 'name with spaces'),
113 118 ('foo', u'unicodΓ©'),
114 119 ('foo/bar', 'baz'),
115 120 ('ordering', 'A'),
116 121 ('ordering', 'b'),
117 122 ('ordering', 'C'),
118 123 (u'Γ₯ b', u'Γ§ d'),
119 124 ]
120 125 hidden_dirs = ['.hidden', '__pycache__']
121 126
122 127 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
123 128 del dirs[0] # remove ''
124 129 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
125 130
126 131 @staticmethod
127 132 def _blob_for_name(name):
128 133 return name.encode('utf-8') + b'\xFF'
129 134
130 135 @staticmethod
131 136 def _txt_for_name(name):
132 137 return u'%s text file' % name
133 138
134 139 def setUp(self):
135 140 nbdir = self.notebook_dir.name
136 141 self.blob = os.urandom(100)
137 142 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
138 143
139 144 for d in (self.dirs + self.hidden_dirs):
140 145 d.replace('/', os.sep)
141 146 if not os.path.isdir(pjoin(nbdir, d)):
142 147 os.mkdir(pjoin(nbdir, d))
143 148
144 149 for d, name in self.dirs_nbs:
145 150 d = d.replace('/', os.sep)
146 151 # create a notebook
147 152 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
148 153 encoding='utf-8') as f:
149 154 nb = new_notebook()
150 155 write(nb, f, version=4)
151 156
152 157 # create a text file
153 158 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
154 159 encoding='utf-8') as f:
155 160 f.write(self._txt_for_name(name))
156 161
157 162 # create a binary file
158 163 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
159 164 f.write(self._blob_for_name(name))
160 165
161 166 self.api = API(self.base_url())
162 167
163 168 def tearDown(self):
164 169 nbdir = self.notebook_dir.name
165 170
166 171 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
167 172 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
168 173
169 174 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
170 175 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
171 176
172 177 def test_list_notebooks(self):
173 178 nbs = notebooks_only(self.api.list().json())
174 179 self.assertEqual(len(nbs), 1)
175 180 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
176 181
177 182 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
178 183 self.assertEqual(len(nbs), 1)
179 184 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
180 185
181 186 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
182 187 self.assertEqual(len(nbs), 1)
183 188 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
184 189 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
185 190
186 191 nbs = notebooks_only(self.api.list('/foo/bar/').json())
187 192 self.assertEqual(len(nbs), 1)
188 193 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
189 194 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
190 195
191 196 nbs = notebooks_only(self.api.list('foo').json())
192 197 self.assertEqual(len(nbs), 4)
193 198 nbnames = { normalize('NFC', n['name']) for n in nbs }
194 199 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
195 200 expected = { normalize('NFC', name) for name in expected }
196 201 self.assertEqual(nbnames, expected)
197 202
198 203 nbs = notebooks_only(self.api.list('ordering').json())
199 204 nbnames = [n['name'] for n in nbs]
200 205 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
201 206 self.assertEqual(nbnames, expected)
202 207
203 208 def test_list_dirs(self):
204 209 print(self.api.list().json())
205 210 dirs = dirs_only(self.api.list().json())
206 211 dir_names = {normalize('NFC', d['name']) for d in dirs}
207 212 print(dir_names)
208 213 print(self.top_level_dirs)
209 214 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
210 215
211 216 def test_list_nonexistant_dir(self):
212 217 with assert_http_error(404):
213 218 self.api.list('nonexistant')
214 219
215 220 def test_get_nb_contents(self):
216 221 for d, name in self.dirs_nbs:
217 222 path = url_path_join(d, name + '.ipynb')
218 223 nb = self.api.read(path).json()
219 224 self.assertEqual(nb['name'], u'%s.ipynb' % name)
220 225 self.assertEqual(nb['path'], path)
221 226 self.assertEqual(nb['type'], 'notebook')
222 227 self.assertIn('content', nb)
223 228 self.assertEqual(nb['format'], 'json')
224 229 self.assertIn('content', nb)
225 230 self.assertIn('metadata', nb['content'])
226 231 self.assertIsInstance(nb['content']['metadata'], dict)
227 232
228 233 def test_get_contents_no_such_file(self):
229 234 # Name that doesn't exist - should be a 404
230 235 with assert_http_error(404):
231 236 self.api.read('foo/q.ipynb')
232 237
233 238 def test_get_text_file_contents(self):
234 239 for d, name in self.dirs_nbs:
235 240 path = url_path_join(d, name + '.txt')
236 241 model = self.api.read(path).json()
237 242 self.assertEqual(model['name'], u'%s.txt' % name)
238 243 self.assertEqual(model['path'], path)
239 244 self.assertIn('content', model)
240 245 self.assertEqual(model['format'], 'text')
241 246 self.assertEqual(model['type'], 'file')
242 247 self.assertEqual(model['content'], self._txt_for_name(name))
243 248
244 249 # Name that doesn't exist - should be a 404
245 250 with assert_http_error(404):
246 251 self.api.read('foo/q.txt')
247 252
253 # Specifying format=text should fail on a non-UTF-8 file
254 with assert_http_error(400):
255 self.api.read('foo/bar/baz.blob', type_='file', format='text')
256
248 257 def test_get_binary_file_contents(self):
249 258 for d, name in self.dirs_nbs:
250 259 path = url_path_join(d, name + '.blob')
251 260 model = self.api.read(path).json()
252 261 self.assertEqual(model['name'], u'%s.blob' % name)
253 262 self.assertEqual(model['path'], path)
254 263 self.assertIn('content', model)
255 264 self.assertEqual(model['format'], 'base64')
256 265 self.assertEqual(model['type'], 'file')
257 266 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
258 267 self.assertEqual(model['content'], b64_data)
259 268
260 269 # Name that doesn't exist - should be a 404
261 270 with assert_http_error(404):
262 271 self.api.read('foo/q.txt')
263 272
264 273 def test_get_bad_type(self):
265 274 with assert_http_error(400):
266 275 self.api.read(u'unicodΓ©', type_='file') # this is a directory
267 276
268 277 with assert_http_error(400):
269 278 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
270 279
271 280 def _check_created(self, resp, path, type='notebook'):
272 281 self.assertEqual(resp.status_code, 201)
273 282 location_header = py3compat.str_to_unicode(resp.headers['Location'])
274 283 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
275 284 rjson = resp.json()
276 285 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
277 286 self.assertEqual(rjson['path'], path)
278 287 self.assertEqual(rjson['type'], type)
279 288 isright = os.path.isdir if type == 'directory' else os.path.isfile
280 289 assert isright(pjoin(
281 290 self.notebook_dir.name,
282 291 path.replace('/', os.sep),
283 292 ))
284 293
285 294 def test_create_untitled(self):
286 295 resp = self.api.create_untitled(path=u'Γ₯ b')
287 296 self._check_created(resp, u'Γ₯ b/Untitled0.ipynb')
288 297
289 298 # Second time
290 299 resp = self.api.create_untitled(path=u'Γ₯ b')
291 300 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
292 301
293 302 # And two directories down
294 303 resp = self.api.create_untitled(path='foo/bar')
295 304 self._check_created(resp, 'foo/bar/Untitled0.ipynb')
296 305
297 306 def test_create_untitled_txt(self):
298 307 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
299 308 self._check_created(resp, 'foo/bar/untitled0.txt', type='file')
300 309
301 310 resp = self.api.read(path='foo/bar/untitled0.txt')
302 311 model = resp.json()
303 312 self.assertEqual(model['type'], 'file')
304 313 self.assertEqual(model['format'], 'text')
305 314 self.assertEqual(model['content'], '')
306 315
307 316 def test_upload(self):
308 317 nb = new_notebook()
309 318 nbmodel = {'content': nb, 'type': 'notebook'}
310 319 path = u'Γ₯ b/Upload tΓ©st.ipynb'
311 320 resp = self.api.upload(path, body=json.dumps(nbmodel))
312 321 self._check_created(resp, path)
313 322
314 323 def test_mkdir_untitled(self):
315 324 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
316 325 self._check_created(resp, u'Γ₯ b/Untitled Folder0', type='directory')
317 326
318 327 # Second time
319 328 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
320 329 self._check_created(resp, u'Γ₯ b/Untitled Folder1', type='directory')
321 330
322 331 # And two directories down
323 332 resp = self.api.mkdir_untitled(path='foo/bar')
324 333 self._check_created(resp, 'foo/bar/Untitled Folder0', type='directory')
325 334
326 335 def test_mkdir(self):
327 336 path = u'Γ₯ b/New βˆ‚ir'
328 337 resp = self.api.mkdir(path)
329 338 self._check_created(resp, path, type='directory')
330 339
331 340 def test_mkdir_hidden_400(self):
332 341 with assert_http_error(400):
333 342 resp = self.api.mkdir(u'Γ₯ b/.hidden')
334 343
335 344 def test_upload_txt(self):
336 345 body = u'ΓΌnicode tΓ©xt'
337 346 model = {
338 347 'content' : body,
339 348 'format' : 'text',
340 349 'type' : 'file',
341 350 }
342 351 path = u'Γ₯ b/Upload tΓ©st.txt'
343 352 resp = self.api.upload(path, body=json.dumps(model))
344 353
345 354 # check roundtrip
346 355 resp = self.api.read(path)
347 356 model = resp.json()
348 357 self.assertEqual(model['type'], 'file')
349 358 self.assertEqual(model['format'], 'text')
350 359 self.assertEqual(model['content'], body)
351 360
352 361 def test_upload_b64(self):
353 362 body = b'\xFFblob'
354 363 b64body = base64.encodestring(body).decode('ascii')
355 364 model = {
356 365 'content' : b64body,
357 366 'format' : 'base64',
358 367 'type' : 'file',
359 368 }
360 369 path = u'Γ₯ b/Upload tΓ©st.blob'
361 370 resp = self.api.upload(path, body=json.dumps(model))
362 371
363 372 # check roundtrip
364 373 resp = self.api.read(path)
365 374 model = resp.json()
366 375 self.assertEqual(model['type'], 'file')
367 376 self.assertEqual(model['path'], path)
368 377 self.assertEqual(model['format'], 'base64')
369 378 decoded = base64.decodestring(model['content'].encode('ascii'))
370 379 self.assertEqual(decoded, body)
371 380
372 381 def test_upload_v2(self):
373 382 nb = v2.new_notebook()
374 383 ws = v2.new_worksheet()
375 384 nb.worksheets.append(ws)
376 385 ws.cells.append(v2.new_code_cell(input='print("hi")'))
377 386 nbmodel = {'content': nb, 'type': 'notebook'}
378 387 path = u'Γ₯ b/Upload tΓ©st.ipynb'
379 388 resp = self.api.upload(path, body=json.dumps(nbmodel))
380 389 self._check_created(resp, path)
381 390 resp = self.api.read(path)
382 391 data = resp.json()
383 392 self.assertEqual(data['content']['nbformat'], 4)
384 393
385 394 def test_copy(self):
386 395 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'unicodΓ©')
387 396 self._check_created(resp, u'unicodΓ©/Γ§ d-Copy0.ipynb')
388 397
389 398 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
390 399 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy0.ipynb')
391 400
392 401 def test_copy_path(self):
393 402 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
394 403 self._check_created(resp, u'Γ₯ b/a-Copy0.ipynb')
395 404
396 405 def test_copy_put_400(self):
397 406 with assert_http_error(400):
398 407 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
399 408
400 409 def test_copy_dir_400(self):
401 410 # can't copy directories
402 411 with assert_http_error(400):
403 412 resp = self.api.copy(u'Γ₯ b', u'foo')
404 413
405 414 def test_delete(self):
406 415 for d, name in self.dirs_nbs:
407 416 print('%r, %r' % (d, name))
408 417 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
409 418 self.assertEqual(resp.status_code, 204)
410 419
411 420 for d in self.dirs + ['/']:
412 421 nbs = notebooks_only(self.api.list(d).json())
413 422 print('------')
414 423 print(d)
415 424 print(nbs)
416 425 self.assertEqual(nbs, [])
417 426
418 427 def test_delete_dirs(self):
419 428 # depth-first delete everything, so we don't try to delete empty directories
420 429 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
421 430 listing = self.api.list(name).json()['content']
422 431 for model in listing:
423 432 self.api.delete(model['path'])
424 433 listing = self.api.list('/').json()['content']
425 434 self.assertEqual(listing, [])
426 435
427 436 def test_delete_non_empty_dir(self):
428 437 """delete non-empty dir raises 400"""
429 438 with assert_http_error(400):
430 439 self.api.delete(u'Γ₯ b')
431 440
432 441 def test_rename(self):
433 442 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
434 443 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
435 444 self.assertEqual(resp.json()['name'], 'z.ipynb')
436 445 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
437 446 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
438 447
439 448 nbs = notebooks_only(self.api.list('foo').json())
440 449 nbnames = set(n['name'] for n in nbs)
441 450 self.assertIn('z.ipynb', nbnames)
442 451 self.assertNotIn('a.ipynb', nbnames)
443 452
444 453 def test_rename_existing(self):
445 454 with assert_http_error(409):
446 455 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
447 456
448 457 def test_save(self):
449 458 resp = self.api.read('foo/a.ipynb')
450 459 nbcontent = json.loads(resp.text)['content']
451 460 nb = from_dict(nbcontent)
452 461 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
453 462
454 463 nbmodel= {'content': nb, 'type': 'notebook'}
455 464 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
456 465
457 466 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
458 467 with io.open(nbfile, 'r', encoding='utf-8') as f:
459 468 newnb = read(f, as_version=4)
460 469 self.assertEqual(newnb.cells[0].source,
461 470 u'Created by test Β³')
462 471 nbcontent = self.api.read('foo/a.ipynb').json()['content']
463 472 newnb = from_dict(nbcontent)
464 473 self.assertEqual(newnb.cells[0].source,
465 474 u'Created by test Β³')
466 475
467 476
468 477 def test_checkpoints(self):
469 478 resp = self.api.read('foo/a.ipynb')
470 479 r = self.api.new_checkpoint('foo/a.ipynb')
471 480 self.assertEqual(r.status_code, 201)
472 481 cp1 = r.json()
473 482 self.assertEqual(set(cp1), {'id', 'last_modified'})
474 483 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
475 484
476 485 # Modify it
477 486 nbcontent = json.loads(resp.text)['content']
478 487 nb = from_dict(nbcontent)
479 488 hcell = new_markdown_cell('Created by test')
480 489 nb.cells.append(hcell)
481 490 # Save
482 491 nbmodel= {'content': nb, 'type': 'notebook'}
483 492 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
484 493
485 494 # List checkpoints
486 495 cps = self.api.get_checkpoints('foo/a.ipynb').json()
487 496 self.assertEqual(cps, [cp1])
488 497
489 498 nbcontent = self.api.read('foo/a.ipynb').json()['content']
490 499 nb = from_dict(nbcontent)
491 500 self.assertEqual(nb.cells[0].source, 'Created by test')
492 501
493 502 # Restore cp1
494 503 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
495 504 self.assertEqual(r.status_code, 204)
496 505 nbcontent = self.api.read('foo/a.ipynb').json()['content']
497 506 nb = from_dict(nbcontent)
498 507 self.assertEqual(nb.cells, [])
499 508
500 509 # Delete cp1
501 510 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
502 511 self.assertEqual(r.status_code, 204)
503 512 cps = self.api.get_checkpoints('foo/a.ipynb').json()
504 513 self.assertEqual(cps, [])
@@ -1,362 +1,365 b''
1 1 # coding: utf-8
2 2 """Tests for the notebook manager."""
3 3 from __future__ import print_function
4 4
5 5 import logging
6 6 import os
7 7
8 8 from tornado.web import HTTPError
9 9 from unittest import TestCase
10 10 from tempfile import NamedTemporaryFile
11 11
12 12 from IPython.nbformat import v4 as nbformat
13 13
14 14 from IPython.utils.tempdir import TemporaryDirectory
15 15 from IPython.utils.traitlets import TraitError
16 16 from IPython.html.utils import url_path_join
17 17 from IPython.testing import decorators as dec
18 18
19 19 from ..filemanager import FileContentsManager
20 20 from ..manager import ContentsManager
21 21
22 22
23 23 class TestFileContentsManager(TestCase):
24 24
25 25 def test_root_dir(self):
26 26 with TemporaryDirectory() as td:
27 27 fm = FileContentsManager(root_dir=td)
28 28 self.assertEqual(fm.root_dir, td)
29 29
30 30 def test_missing_root_dir(self):
31 31 with TemporaryDirectory() as td:
32 32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34 34
35 35 def test_invalid_root_dir(self):
36 36 with NamedTemporaryFile() as tf:
37 37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38 38
39 39 def test_get_os_path(self):
40 40 # full filesystem path should be returned with correct operating system
41 41 # separators.
42 42 with TemporaryDirectory() as td:
43 43 root = td
44 44 fm = FileContentsManager(root_dir=root)
45 45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
46 46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 48 self.assertEqual(path, fs_path)
49 49
50 50 fm = FileContentsManager(root_dir=root)
51 51 path = fm._get_os_path('test.ipynb')
52 52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 53 self.assertEqual(path, fs_path)
54 54
55 55 fm = FileContentsManager(root_dir=root)
56 56 path = fm._get_os_path('////test.ipynb')
57 57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 58 self.assertEqual(path, fs_path)
59 59
60 60 def test_checkpoint_subdir(self):
61 61 subd = u'sub βˆ‚ir'
62 62 cp_name = 'test-cp.ipynb'
63 63 with TemporaryDirectory() as td:
64 64 root = td
65 65 os.mkdir(os.path.join(td, subd))
66 66 fm = FileContentsManager(root_dir=root)
67 67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
68 68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
69 69 self.assertNotEqual(cp_dir, cp_subdir)
70 70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72 72
73 73
74 74 class TestContentsManager(TestCase):
75 75
76 76 def setUp(self):
77 77 self._temp_dir = TemporaryDirectory()
78 78 self.td = self._temp_dir.name
79 79 self.contents_manager = FileContentsManager(
80 80 root_dir=self.td,
81 81 log=logging.getLogger()
82 82 )
83 83
84 84 def tearDown(self):
85 85 self._temp_dir.cleanup()
86 86
87 87 def make_dir(self, abs_path, rel_path):
88 88 """make subdirectory, rel_path is the relative path
89 89 to that directory from the location where the server started"""
90 90 os_path = os.path.join(abs_path, rel_path)
91 91 try:
92 92 os.makedirs(os_path)
93 93 except OSError:
94 94 print("Directory already exists: %r" % os_path)
95 95 return os_path
96 96
97 97 def add_code_cell(self, nb):
98 98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 100 nb.cells.append(cell)
101 101
102 102 def new_notebook(self):
103 103 cm = self.contents_manager
104 104 model = cm.new_untitled(type='notebook')
105 105 name = model['name']
106 106 path = model['path']
107 107
108 108 full_model = cm.get_model(path)
109 109 nb = full_model['content']
110 110 self.add_code_cell(nb)
111 111
112 112 cm.save(full_model, path)
113 113 return nb, name, path
114 114
115 115 def test_new_untitled(self):
116 116 cm = self.contents_manager
117 117 # Test in root directory
118 118 model = cm.new_untitled(type='notebook')
119 119 assert isinstance(model, dict)
120 120 self.assertIn('name', model)
121 121 self.assertIn('path', model)
122 122 self.assertIn('type', model)
123 123 self.assertEqual(model['type'], 'notebook')
124 124 self.assertEqual(model['name'], 'Untitled0.ipynb')
125 125 self.assertEqual(model['path'], 'Untitled0.ipynb')
126 126
127 127 # Test in sub-directory
128 128 model = cm.new_untitled(type='directory')
129 129 assert isinstance(model, dict)
130 130 self.assertIn('name', model)
131 131 self.assertIn('path', model)
132 132 self.assertIn('type', model)
133 133 self.assertEqual(model['type'], 'directory')
134 134 self.assertEqual(model['name'], 'Untitled Folder0')
135 135 self.assertEqual(model['path'], 'Untitled Folder0')
136 136 sub_dir = model['path']
137 137
138 138 model = cm.new_untitled(path=sub_dir)
139 139 assert isinstance(model, dict)
140 140 self.assertIn('name', model)
141 141 self.assertIn('path', model)
142 142 self.assertIn('type', model)
143 143 self.assertEqual(model['type'], 'file')
144 144 self.assertEqual(model['name'], 'untitled0')
145 145 self.assertEqual(model['path'], '%s/untitled0' % sub_dir)
146 146
147 147 def test_get(self):
148 148 cm = self.contents_manager
149 149 # Create a notebook
150 150 model = cm.new_untitled(type='notebook')
151 151 name = model['name']
152 152 path = model['path']
153 153
154 154 # Check that we 'get' on the notebook we just created
155 155 model2 = cm.get_model(path)
156 156 assert isinstance(model2, dict)
157 157 self.assertIn('name', model2)
158 158 self.assertIn('path', model2)
159 159 self.assertEqual(model['name'], name)
160 160 self.assertEqual(model['path'], path)
161 161
162 162 nb_as_file = cm.get_model(path, content=True, type_='file')
163 163 self.assertEqual(nb_as_file['path'], path)
164 164 self.assertEqual(nb_as_file['type'], 'file')
165 165 self.assertEqual(nb_as_file['format'], 'text')
166 166 self.assertNotIsInstance(nb_as_file['content'], dict)
167 167
168 nb_as_bin_file = cm.get_model(path, content=True, type_='file', format='base64')
169 self.assertEqual(nb_as_bin_file['format'], 'base64')
170
168 171 # Test in sub-directory
169 172 sub_dir = '/foo/'
170 173 self.make_dir(cm.root_dir, 'foo')
171 174 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
172 175 model2 = cm.get_model(sub_dir + name)
173 176 assert isinstance(model2, dict)
174 177 self.assertIn('name', model2)
175 178 self.assertIn('path', model2)
176 179 self.assertIn('content', model2)
177 180 self.assertEqual(model2['name'], 'Untitled0.ipynb')
178 181 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
179 182
180 183 # Test getting directory model
181 184 dirmodel = cm.get_model('foo')
182 185 self.assertEqual(dirmodel['type'], 'directory')
183 186
184 187 with self.assertRaises(HTTPError):
185 188 cm.get_model('foo', type_='file')
186 189
187 190
188 191 @dec.skip_win32
189 192 def test_bad_symlink(self):
190 193 cm = self.contents_manager
191 194 path = 'test bad symlink'
192 195 os_path = self.make_dir(cm.root_dir, path)
193 196
194 197 file_model = cm.new_untitled(path=path, ext='.txt')
195 198
196 199 # create a broken symlink
197 200 os.symlink("target", os.path.join(os_path, "bad symlink"))
198 201 model = cm.get_model(path)
199 202 self.assertEqual(model['content'], [file_model])
200 203
201 204 @dec.skip_win32
202 205 def test_good_symlink(self):
203 206 cm = self.contents_manager
204 207 parent = 'test good symlink'
205 208 name = 'good symlink'
206 209 path = '{0}/{1}'.format(parent, name)
207 210 os_path = self.make_dir(cm.root_dir, parent)
208 211
209 212 file_model = cm.new(path=parent + '/zfoo.txt')
210 213
211 214 # create a good symlink
212 215 os.symlink(file_model['name'], os.path.join(os_path, name))
213 216 symlink_model = cm.get_model(path, content=False)
214 217 dir_model = cm.get_model(parent)
215 218 self.assertEqual(
216 219 sorted(dir_model['content'], key=lambda x: x['name']),
217 220 [symlink_model, file_model],
218 221 )
219 222
220 223 def test_update(self):
221 224 cm = self.contents_manager
222 225 # Create a notebook
223 226 model = cm.new_untitled(type='notebook')
224 227 name = model['name']
225 228 path = model['path']
226 229
227 230 # Change the name in the model for rename
228 231 model['path'] = 'test.ipynb'
229 232 model = cm.update(model, path)
230 233 assert isinstance(model, dict)
231 234 self.assertIn('name', model)
232 235 self.assertIn('path', model)
233 236 self.assertEqual(model['name'], 'test.ipynb')
234 237
235 238 # Make sure the old name is gone
236 239 self.assertRaises(HTTPError, cm.get_model, path)
237 240
238 241 # Test in sub-directory
239 242 # Create a directory and notebook in that directory
240 243 sub_dir = '/foo/'
241 244 self.make_dir(cm.root_dir, 'foo')
242 245 model = cm.new_untitled(path=sub_dir, type='notebook')
243 246 name = model['name']
244 247 path = model['path']
245 248
246 249 # Change the name in the model for rename
247 250 d = path.rsplit('/', 1)[0]
248 251 new_path = model['path'] = d + '/test_in_sub.ipynb'
249 252 model = cm.update(model, path)
250 253 assert isinstance(model, dict)
251 254 self.assertIn('name', model)
252 255 self.assertIn('path', model)
253 256 self.assertEqual(model['name'], 'test_in_sub.ipynb')
254 257 self.assertEqual(model['path'], new_path)
255 258
256 259 # Make sure the old name is gone
257 260 self.assertRaises(HTTPError, cm.get_model, path)
258 261
259 262 def test_save(self):
260 263 cm = self.contents_manager
261 264 # Create a notebook
262 265 model = cm.new_untitled(type='notebook')
263 266 name = model['name']
264 267 path = model['path']
265 268
266 269 # Get the model with 'content'
267 270 full_model = cm.get_model(path)
268 271
269 272 # Save the notebook
270 273 model = cm.save(full_model, path)
271 274 assert isinstance(model, dict)
272 275 self.assertIn('name', model)
273 276 self.assertIn('path', model)
274 277 self.assertEqual(model['name'], name)
275 278 self.assertEqual(model['path'], path)
276 279
277 280 # Test in sub-directory
278 281 # Create a directory and notebook in that directory
279 282 sub_dir = '/foo/'
280 283 self.make_dir(cm.root_dir, 'foo')
281 284 model = cm.new_untitled(path=sub_dir, type='notebook')
282 285 name = model['name']
283 286 path = model['path']
284 287 model = cm.get_model(path)
285 288
286 289 # Change the name in the model for rename
287 290 model = cm.save(model, path)
288 291 assert isinstance(model, dict)
289 292 self.assertIn('name', model)
290 293 self.assertIn('path', model)
291 294 self.assertEqual(model['name'], 'Untitled0.ipynb')
292 295 self.assertEqual(model['path'], 'foo/Untitled0.ipynb')
293 296
294 297 def test_delete(self):
295 298 cm = self.contents_manager
296 299 # Create a notebook
297 300 nb, name, path = self.new_notebook()
298 301
299 302 # Delete the notebook
300 303 cm.delete(path)
301 304
302 305 # Check that a 'get' on the deleted notebook raises and error
303 306 self.assertRaises(HTTPError, cm.get_model, path)
304 307
305 308 def test_copy(self):
306 309 cm = self.contents_manager
307 310 parent = u'Γ₯ b'
308 311 name = u'nb √.ipynb'
309 312 path = u'{0}/{1}'.format(parent, name)
310 313 os.mkdir(os.path.join(cm.root_dir, parent))
311 314 orig = cm.new(path=path)
312 315
313 316 # copy with unspecified name
314 317 copy = cm.copy(path)
315 318 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
316 319
317 320 # copy with specified name
318 321 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
319 322 self.assertEqual(copy2['name'], u'copy 2.ipynb')
320 323 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
321 324
322 325 def test_trust_notebook(self):
323 326 cm = self.contents_manager
324 327 nb, name, path = self.new_notebook()
325 328
326 329 untrusted = cm.get_model(path)['content']
327 330 assert not cm.notary.check_cells(untrusted)
328 331
329 332 # print(untrusted)
330 333 cm.trust_notebook(path)
331 334 trusted = cm.get_model(path)['content']
332 335 # print(trusted)
333 336 assert cm.notary.check_cells(trusted)
334 337
335 338 def test_mark_trusted_cells(self):
336 339 cm = self.contents_manager
337 340 nb, name, path = self.new_notebook()
338 341
339 342 cm.mark_trusted_cells(nb, path)
340 343 for cell in nb.cells:
341 344 if cell.cell_type == 'code':
342 345 assert not cell.metadata.trusted
343 346
344 347 cm.trust_notebook(path)
345 348 nb = cm.get_model(path)['content']
346 349 for cell in nb.cells:
347 350 if cell.cell_type == 'code':
348 351 assert cell.metadata.trusted
349 352
350 353 def test_check_and_sign(self):
351 354 cm = self.contents_manager
352 355 nb, name, path = self.new_notebook()
353 356
354 357 cm.mark_trusted_cells(nb, path)
355 358 cm.check_and_sign(nb, path)
356 359 assert not cm.notary.check_signature(nb)
357 360
358 361 cm.trust_notebook(path)
359 362 nb = cm.get_model(path)['content']
360 363 cm.mark_trusted_cells(nb, path)
361 364 cm.check_and_sign(nb, path)
362 365 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now