##// END OF EJS Templates
Add type parameter for contents GET requests
Thomas Kluyver -
Show More
@@ -1,534 +1,545 b''
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 import io
8 8 import os
9 9 import glob
10 10 import shutil
11 11
12 12 from tornado import web
13 13
14 14 from .manager import ContentsManager
15 15 from IPython import nbformat
16 16 from IPython.utils.io import atomic_writing
17 17 from IPython.utils.path import ensure_dir_exists
18 18 from IPython.utils.traitlets import Unicode, Bool, TraitError
19 19 from IPython.utils.py3compat import getcwd
20 20 from IPython.utils import tz
21 21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
22 22
23 23
24 24 class FileContentsManager(ContentsManager):
25 25
26 26 root_dir = Unicode(getcwd(), config=True)
27 27
28 28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
29 29 def _save_script_changed(self):
30 30 self.log.warn("""
31 31 Automatically saving notebooks as scripts has been removed.
32 32 Use `ipython nbconvert --to python [notebook]` instead.
33 33 """)
34 34
35 35 def _root_dir_changed(self, name, old, new):
36 36 """Do a bit of validation of the root_dir."""
37 37 if not os.path.isabs(new):
38 38 # If we receive a non-absolute path, make it absolute.
39 39 self.root_dir = os.path.abspath(new)
40 40 return
41 41 if not os.path.isdir(new):
42 42 raise TraitError("%r is not a directory" % new)
43 43
44 44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
45 45 help="""The directory name in which to keep file checkpoints
46 46
47 47 This is a path relative to the file's own directory.
48 48
49 49 By default, it is .ipynb_checkpoints
50 50 """
51 51 )
52 52
53 53 def _copy(self, src, dest):
54 54 """copy src to dest
55 55
56 56 like shutil.copy2, but log errors in copystat
57 57 """
58 58 shutil.copyfile(src, dest)
59 59 try:
60 60 shutil.copystat(src, dest)
61 61 except OSError as e:
62 62 self.log.debug("copystat on %s failed", dest, exc_info=True)
63 63
64 64 def _get_os_path(self, path):
65 65 """Given an API path, return its file system path.
66 66
67 67 Parameters
68 68 ----------
69 69 path : string
70 70 The relative API path to the named file.
71 71
72 72 Returns
73 73 -------
74 74 path : string
75 75 Native, absolute OS path to for a file.
76 76 """
77 77 return to_os_path(path, self.root_dir)
78 78
79 79 def dir_exists(self, path):
80 80 """Does the API-style path refer to an extant directory?
81 81
82 82 API-style wrapper for os.path.isdir
83 83
84 84 Parameters
85 85 ----------
86 86 path : string
87 87 The path to check. This is an API path (`/` separated,
88 88 relative to root_dir).
89 89
90 90 Returns
91 91 -------
92 92 exists : bool
93 93 Whether the path is indeed a directory.
94 94 """
95 95 path = path.strip('/')
96 96 os_path = self._get_os_path(path=path)
97 97 return os.path.isdir(os_path)
98 98
99 99 def is_hidden(self, path):
100 100 """Does the API style path correspond to a hidden directory or file?
101 101
102 102 Parameters
103 103 ----------
104 104 path : string
105 105 The path to check. This is an API path (`/` separated,
106 106 relative to root_dir).
107 107
108 108 Returns
109 109 -------
110 110 hidden : bool
111 111 Whether the path exists and is hidden.
112 112 """
113 113 path = path.strip('/')
114 114 os_path = self._get_os_path(path=path)
115 115 return is_hidden(os_path, self.root_dir)
116 116
117 117 def file_exists(self, path):
118 118 """Returns True if the file exists, else returns False.
119 119
120 120 API-style wrapper for os.path.isfile
121 121
122 122 Parameters
123 123 ----------
124 124 path : string
125 125 The relative path to the file (with '/' as separator)
126 126
127 127 Returns
128 128 -------
129 129 exists : bool
130 130 Whether the file exists.
131 131 """
132 132 path = path.strip('/')
133 133 os_path = self._get_os_path(path)
134 134 return os.path.isfile(os_path)
135 135
136 136 def exists(self, path):
137 137 """Returns True if the path exists, else returns False.
138 138
139 139 API-style wrapper for os.path.exists
140 140
141 141 Parameters
142 142 ----------
143 143 path : string
144 144 The API path to the file (with '/' as separator)
145 145
146 146 Returns
147 147 -------
148 148 exists : bool
149 149 Whether the target exists.
150 150 """
151 151 path = path.strip('/')
152 152 os_path = self._get_os_path(path=path)
153 153 return os.path.exists(os_path)
154 154
155 155 def _base_model(self, path):
156 156 """Build the common base of a contents model"""
157 157 os_path = self._get_os_path(path)
158 158 info = os.stat(os_path)
159 159 last_modified = tz.utcfromtimestamp(info.st_mtime)
160 160 created = tz.utcfromtimestamp(info.st_ctime)
161 161 # Create the base model.
162 162 model = {}
163 163 model['name'] = path.rsplit('/', 1)[-1]
164 164 model['path'] = path
165 165 model['last_modified'] = last_modified
166 166 model['created'] = created
167 167 model['content'] = None
168 168 model['format'] = None
169 169 return model
170 170
171 171 def _dir_model(self, path, content=True):
172 172 """Build a model for a directory
173 173
174 174 if content is requested, will include a listing of the directory
175 175 """
176 176 os_path = self._get_os_path(path)
177 177
178 178 four_o_four = u'directory does not exist: %r' % os_path
179 179
180 180 if not os.path.isdir(os_path):
181 181 raise web.HTTPError(404, four_o_four)
182 182 elif is_hidden(os_path, self.root_dir):
183 183 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
184 184 os_path
185 185 )
186 186 raise web.HTTPError(404, four_o_four)
187 187
188 188 model = self._base_model(path)
189 189 model['type'] = 'directory'
190 190 if content:
191 191 model['content'] = contents = []
192 192 os_dir = self._get_os_path(path)
193 193 for name in os.listdir(os_dir):
194 194 os_path = os.path.join(os_dir, name)
195 195 # skip over broken symlinks in listing
196 196 if not os.path.exists(os_path):
197 197 self.log.warn("%s doesn't exist", os_path)
198 198 continue
199 199 elif not os.path.isfile(os_path) and not os.path.isdir(os_path):
200 200 self.log.debug("%s not a regular file", os_path)
201 201 continue
202 202 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
203 203 contents.append(self.get_model(
204 204 path='%s/%s' % (path, name),
205 205 content=False)
206 206 )
207 207
208 208 model['format'] = 'json'
209 209
210 210 return model
211 211
212 212 def _file_model(self, path, content=True):
213 213 """Build a model for a file
214 214
215 215 if content is requested, include the file contents.
216 216 UTF-8 text files will be unicode, binary files will be base64-encoded.
217 217 """
218 218 model = self._base_model(path)
219 219 model['type'] = 'file'
220 220 if content:
221 221 os_path = self._get_os_path(path)
222 222 if not os.path.isfile(os_path):
223 223 # could be FIFO
224 224 raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
225 225 with io.open(os_path, 'rb') as f:
226 226 bcontent = f.read()
227 227 try:
228 228 model['content'] = bcontent.decode('utf8')
229 229 except UnicodeError as e:
230 230 model['content'] = base64.encodestring(bcontent).decode('ascii')
231 231 model['format'] = 'base64'
232 232 else:
233 233 model['format'] = 'text'
234 234 return model
235 235
236 236
237 237 def _notebook_model(self, path, content=True):
238 238 """Build a notebook model
239 239
240 240 if content is requested, the notebook content will be populated
241 241 as a JSON structure (not double-serialized)
242 242 """
243 243 model = self._base_model(path)
244 244 model['type'] = 'notebook'
245 245 if content:
246 246 os_path = self._get_os_path(path)
247 247 with io.open(os_path, 'r', encoding='utf-8') as f:
248 248 try:
249 249 nb = nbformat.read(f, as_version=4)
250 250 except Exception as e:
251 251 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
252 252 self.mark_trusted_cells(nb, path)
253 253 model['content'] = nb
254 254 model['format'] = 'json'
255 255 self.validate_notebook_model(model)
256 256 return model
257 257
258 def get_model(self, path, content=True):
258 def get_model(self, path, content=True, type_=None):
259 259 """ Takes a path for an entity and returns its model
260 260
261 261 Parameters
262 262 ----------
263 263 path : str
264 264 the API path that describes the relative path for the target
265 content : bool
266 Whether to include the contents in the reply
267 type_ : str, optional
268 The requested type - 'file', 'notebook', or 'directory'.
269 Will raise HTTPError 406 if the content doesn't match.
265 270
266 271 Returns
267 272 -------
268 273 model : dict
269 274 the contents model. If content=True, returns the contents
270 275 of the file or directory as well.
271 276 """
272 277 path = path.strip('/')
273 278
274 279 if not self.exists(path):
275 280 raise web.HTTPError(404, u'No such file or directory: %s' % path)
276 281
277 282 os_path = self._get_os_path(path)
278 283 if os.path.isdir(os_path):
284 if type_ not in (None, 'directory'):
285 raise web.HTTPError(400,
286 u'%s is a directory, not a %s' % (path, type_))
279 287 model = self._dir_model(path, content=content)
280 elif path.endswith('.ipynb'):
288 elif type_ == 'notebook' or (type_ is None and path.endswith('.ipynb')):
281 289 model = self._notebook_model(path, content=content)
282 290 else:
291 if type_ == 'directory':
292 raise web.HTTPError(400,
293 u'%s is not a directory')
283 294 model = self._file_model(path, content=content)
284 295 return model
285 296
286 297 def _save_notebook(self, os_path, model, path=''):
287 298 """save a notebook file"""
288 299 # Save the notebook file
289 300 nb = nbformat.from_dict(model['content'])
290 301
291 302 self.check_and_sign(nb, path)
292 303
293 304 with atomic_writing(os_path, encoding='utf-8') as f:
294 305 nbformat.write(nb, f, version=nbformat.NO_CONVERT)
295 306
296 307 def _save_file(self, os_path, model, path=''):
297 308 """save a non-notebook file"""
298 309 fmt = model.get('format', None)
299 310 if fmt not in {'text', 'base64'}:
300 311 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
301 312 try:
302 313 content = model['content']
303 314 if fmt == 'text':
304 315 bcontent = content.encode('utf8')
305 316 else:
306 317 b64_bytes = content.encode('ascii')
307 318 bcontent = base64.decodestring(b64_bytes)
308 319 except Exception as e:
309 320 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
310 321 with atomic_writing(os_path, text=False) as f:
311 322 f.write(bcontent)
312 323
313 324 def _save_directory(self, os_path, model, path=''):
314 325 """create a directory"""
315 326 if is_hidden(os_path, self.root_dir):
316 327 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
317 328 if not os.path.exists(os_path):
318 329 os.mkdir(os_path)
319 330 elif not os.path.isdir(os_path):
320 331 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
321 332 else:
322 333 self.log.debug("Directory %r already exists", os_path)
323 334
324 335 def save(self, model, path=''):
325 336 """Save the file model and return the model with no content."""
326 337 path = path.strip('/')
327 338
328 339 if 'type' not in model:
329 340 raise web.HTTPError(400, u'No file type provided')
330 341 if 'content' not in model and model['type'] != 'directory':
331 342 raise web.HTTPError(400, u'No file content provided')
332 343
333 344 # One checkpoint should always exist
334 345 if self.file_exists(path) and not self.list_checkpoints(path):
335 346 self.create_checkpoint(path)
336 347
337 348 os_path = self._get_os_path(path)
338 349 self.log.debug("Saving %s", os_path)
339 350 try:
340 351 if model['type'] == 'notebook':
341 352 self._save_notebook(os_path, model, path)
342 353 elif model['type'] == 'file':
343 354 self._save_file(os_path, model, path)
344 355 elif model['type'] == 'directory':
345 356 self._save_directory(os_path, model, path)
346 357 else:
347 358 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
348 359 except web.HTTPError:
349 360 raise
350 361 except Exception as e:
351 362 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
352 363
353 364 validation_message = None
354 365 if model['type'] == 'notebook':
355 366 self.validate_notebook_model(model)
356 367 validation_message = model.get('message', None)
357 368
358 369 model = self.get_model(path, content=False)
359 370 if validation_message:
360 371 model['message'] = validation_message
361 372 return model
362 373
363 374 def update(self, model, path):
364 375 """Update the file's path
365 376
366 377 For use in PATCH requests, to enable renaming a file without
367 378 re-uploading its contents. Only used for renaming at the moment.
368 379 """
369 380 path = path.strip('/')
370 381 new_path = model.get('path', path).strip('/')
371 382 if path != new_path:
372 383 self.rename(path, new_path)
373 384 model = self.get_model(new_path, content=False)
374 385 return model
375 386
376 387 def delete(self, path):
377 388 """Delete file at path."""
378 389 path = path.strip('/')
379 390 os_path = self._get_os_path(path)
380 391 rm = os.unlink
381 392 if os.path.isdir(os_path):
382 393 listing = os.listdir(os_path)
383 394 # don't delete non-empty directories (checkpoints dir doesn't count)
384 395 if listing and listing != [self.checkpoint_dir]:
385 396 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
386 397 elif not os.path.isfile(os_path):
387 398 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
388 399
389 400 # clear checkpoints
390 401 for checkpoint in self.list_checkpoints(path):
391 402 checkpoint_id = checkpoint['id']
392 403 cp_path = self.get_checkpoint_path(checkpoint_id, path)
393 404 if os.path.isfile(cp_path):
394 405 self.log.debug("Unlinking checkpoint %s", cp_path)
395 406 os.unlink(cp_path)
396 407
397 408 if os.path.isdir(os_path):
398 409 self.log.debug("Removing directory %s", os_path)
399 410 shutil.rmtree(os_path)
400 411 else:
401 412 self.log.debug("Unlinking file %s", os_path)
402 413 rm(os_path)
403 414
404 415 def rename(self, old_path, new_path):
405 416 """Rename a file."""
406 417 old_path = old_path.strip('/')
407 418 new_path = new_path.strip('/')
408 419 if new_path == old_path:
409 420 return
410 421
411 422 new_os_path = self._get_os_path(new_path)
412 423 old_os_path = self._get_os_path(old_path)
413 424
414 425 # Should we proceed with the move?
415 426 if os.path.exists(new_os_path):
416 427 raise web.HTTPError(409, u'File already exists: %s' % new_path)
417 428
418 429 # Move the file
419 430 try:
420 431 shutil.move(old_os_path, new_os_path)
421 432 except Exception as e:
422 433 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
423 434
424 435 # Move the checkpoints
425 436 old_checkpoints = self.list_checkpoints(old_path)
426 437 for cp in old_checkpoints:
427 438 checkpoint_id = cp['id']
428 439 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
429 440 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
430 441 if os.path.isfile(old_cp_path):
431 442 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
432 443 shutil.move(old_cp_path, new_cp_path)
433 444
434 445 # Checkpoint-related utilities
435 446
436 447 def get_checkpoint_path(self, checkpoint_id, path):
437 448 """find the path to a checkpoint"""
438 449 path = path.strip('/')
439 450 parent, name = ('/' + path).rsplit('/', 1)
440 451 parent = parent.strip('/')
441 452 basename, ext = os.path.splitext(name)
442 453 filename = u"{name}-{checkpoint_id}{ext}".format(
443 454 name=basename,
444 455 checkpoint_id=checkpoint_id,
445 456 ext=ext,
446 457 )
447 458 os_path = self._get_os_path(path=parent)
448 459 cp_dir = os.path.join(os_path, self.checkpoint_dir)
449 460 ensure_dir_exists(cp_dir)
450 461 cp_path = os.path.join(cp_dir, filename)
451 462 return cp_path
452 463
453 464 def get_checkpoint_model(self, checkpoint_id, path):
454 465 """construct the info dict for a given checkpoint"""
455 466 path = path.strip('/')
456 467 cp_path = self.get_checkpoint_path(checkpoint_id, path)
457 468 stats = os.stat(cp_path)
458 469 last_modified = tz.utcfromtimestamp(stats.st_mtime)
459 470 info = dict(
460 471 id = checkpoint_id,
461 472 last_modified = last_modified,
462 473 )
463 474 return info
464 475
465 476 # public checkpoint API
466 477
467 478 def create_checkpoint(self, path):
468 479 """Create a checkpoint from the current state of a file"""
469 480 path = path.strip('/')
470 481 if not self.file_exists(path):
471 482 raise web.HTTPError(404)
472 483 src_path = self._get_os_path(path)
473 484 # only the one checkpoint ID:
474 485 checkpoint_id = u"checkpoint"
475 486 cp_path = self.get_checkpoint_path(checkpoint_id, path)
476 487 self.log.debug("creating checkpoint for %s", path)
477 488 self._copy(src_path, cp_path)
478 489
479 490 # return the checkpoint info
480 491 return self.get_checkpoint_model(checkpoint_id, path)
481 492
482 493 def list_checkpoints(self, path):
483 494 """list the checkpoints for a given file
484 495
485 496 This contents manager currently only supports one checkpoint per file.
486 497 """
487 498 path = path.strip('/')
488 499 checkpoint_id = "checkpoint"
489 500 os_path = self.get_checkpoint_path(checkpoint_id, path)
490 501 if not os.path.exists(os_path):
491 502 return []
492 503 else:
493 504 return [self.get_checkpoint_model(checkpoint_id, path)]
494 505
495 506
496 507 def restore_checkpoint(self, checkpoint_id, path):
497 508 """restore a file to a checkpointed state"""
498 509 path = path.strip('/')
499 510 self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
500 511 nb_path = self._get_os_path(path)
501 512 cp_path = self.get_checkpoint_path(checkpoint_id, path)
502 513 if not os.path.isfile(cp_path):
503 514 self.log.debug("checkpoint file does not exist: %s", cp_path)
504 515 raise web.HTTPError(404,
505 516 u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
506 517 )
507 518 # ensure notebook is readable (never restore from an unreadable notebook)
508 519 if cp_path.endswith('.ipynb'):
509 520 with io.open(cp_path, 'r', encoding='utf-8') as f:
510 521 nbformat.read(f, as_version=4)
511 522 self._copy(cp_path, nb_path)
512 523 self.log.debug("copying %s -> %s", cp_path, nb_path)
513 524
514 525 def delete_checkpoint(self, checkpoint_id, path):
515 526 """delete a file's checkpoint"""
516 527 path = path.strip('/')
517 528 cp_path = self.get_checkpoint_path(checkpoint_id, path)
518 529 if not os.path.isfile(cp_path):
519 530 raise web.HTTPError(404,
520 531 u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
521 532 )
522 533 self.log.debug("unlinking %s", cp_path)
523 534 os.unlink(cp_path)
524 535
525 536 def info_string(self):
526 537 return "Serving notebooks from local directory: %s" % self.root_dir
527 538
528 539 def get_kernel_path(self, path, model=None):
529 540 """Return the initial working dir a kernel associated with a given notebook"""
530 541 if '/' in path:
531 542 parent_dir = path.rsplit('/', 1)[0]
532 543 else:
533 544 parent_dir = ''
534 545 return self._get_os_path(parent_dir)
@@ -1,257 +1,261 b''
1 1 """Tornado handlers for the contents web service."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 7
8 8 from tornado import web
9 9
10 10 from IPython.html.utils import url_path_join, url_escape
11 11 from IPython.utils.jsonutil import date_default
12 12
13 13 from IPython.html.base.handlers import (
14 14 IPythonHandler, json_errors, path_regex,
15 15 )
16 16
17 17
18 18 def sort_key(model):
19 19 """key function for case-insensitive sort by name and type"""
20 20 iname = model['name'].lower()
21 21 type_key = {
22 22 'directory' : '0',
23 23 'notebook' : '1',
24 24 'file' : '2',
25 25 }.get(model['type'], '9')
26 26 return u'%s%s' % (type_key, iname)
27 27
28 28 class ContentsHandler(IPythonHandler):
29 29
30 30 SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
31 31
32 32 def location_url(self, path):
33 33 """Return the full URL location of a file.
34 34
35 35 Parameters
36 36 ----------
37 37 path : unicode
38 38 The API path of the file, such as "foo/bar.txt".
39 39 """
40 40 return url_escape(url_path_join(
41 41 self.base_url, 'api', 'contents', path
42 42 ))
43 43
44 44 def _finish_model(self, model, location=True):
45 45 """Finish a JSON request with a model, setting relevant headers, etc."""
46 46 if location:
47 47 location = self.location_url(model['path'])
48 48 self.set_header('Location', location)
49 49 self.set_header('Last-Modified', model['last_modified'])
50 50 self.finish(json.dumps(model, default=date_default))
51 51
52 52 @web.authenticated
53 53 @json_errors
54 54 def get(self, path=''):
55 55 """Return a model for a file or directory.
56 56
57 57 A directory model contains a list of models (without content)
58 58 of the files and directories it contains.
59 59 """
60 60 path = path or ''
61 model = self.contents_manager.get_model(path=path)
61 type_ = self.get_query_argument('type', default=None)
62 if type_ not in {None, 'directory', 'file', 'notebook'}:
63 raise web.HTTPError(400, u'Type %r is invalid' % type_)
64
65 model = self.contents_manager.get_model(path=path, type_=type_)
62 66 if model['type'] == 'directory':
63 67 # group listing by type, then by name (case-insensitive)
64 68 # FIXME: sorting should be done in the frontends
65 69 model['content'].sort(key=sort_key)
66 70 self._finish_model(model, location=False)
67 71
68 72 @web.authenticated
69 73 @json_errors
70 74 def patch(self, path=''):
71 75 """PATCH renames a file or directory without re-uploading content."""
72 76 cm = self.contents_manager
73 77 model = self.get_json_body()
74 78 if model is None:
75 79 raise web.HTTPError(400, u'JSON body missing')
76 80 model = cm.update(model, path)
77 81 self._finish_model(model)
78 82
79 83 def _copy(self, copy_from, copy_to=None):
80 84 """Copy a file, optionally specifying a target directory."""
81 85 self.log.info(u"Copying {copy_from} to {copy_to}".format(
82 86 copy_from=copy_from,
83 87 copy_to=copy_to or '',
84 88 ))
85 89 model = self.contents_manager.copy(copy_from, copy_to)
86 90 self.set_status(201)
87 91 self._finish_model(model)
88 92
89 93 def _upload(self, model, path):
90 94 """Handle upload of a new file to path"""
91 95 self.log.info(u"Uploading file to %s", path)
92 96 model = self.contents_manager.new(model, path)
93 97 self.set_status(201)
94 98 self._finish_model(model)
95 99
96 100 def _new_untitled(self, path, type='', ext=''):
97 101 """Create a new, empty untitled entity"""
98 102 self.log.info(u"Creating new %s in %s", type or 'file', path)
99 103 model = self.contents_manager.new_untitled(path=path, type=type, ext=ext)
100 104 self.set_status(201)
101 105 self._finish_model(model)
102 106
103 107 def _save(self, model, path):
104 108 """Save an existing file."""
105 109 self.log.info(u"Saving file at %s", path)
106 110 model = self.contents_manager.save(model, path)
107 111 self._finish_model(model)
108 112
109 113 @web.authenticated
110 114 @json_errors
111 115 def post(self, path=''):
112 116 """Create a new file in the specified path.
113 117
114 118 POST creates new files. The server always decides on the name.
115 119
116 120 POST /api/contents/path
117 121 New untitled, empty file or directory.
118 122 POST /api/contents/path
119 123 with body {"copy_from" : "/path/to/OtherNotebook.ipynb"}
120 124 New copy of OtherNotebook in path
121 125 """
122 126
123 127 cm = self.contents_manager
124 128
125 129 if cm.file_exists(path):
126 130 raise web.HTTPError(400, "Cannot POST to files, use PUT instead.")
127 131
128 132 if not cm.dir_exists(path):
129 133 raise web.HTTPError(404, "No such directory: %s" % path)
130 134
131 135 model = self.get_json_body()
132 136
133 137 if model is not None:
134 138 copy_from = model.get('copy_from')
135 139 ext = model.get('ext', '')
136 140 type = model.get('type', '')
137 141 if copy_from:
138 142 self._copy(copy_from, path)
139 143 else:
140 144 self._new_untitled(path, type=type, ext=ext)
141 145 else:
142 146 self._new_untitled(path)
143 147
144 148 @web.authenticated
145 149 @json_errors
146 150 def put(self, path=''):
147 151 """Saves the file in the location specified by name and path.
148 152
149 153 PUT is very similar to POST, but the requester specifies the name,
150 154 whereas with POST, the server picks the name.
151 155
152 156 PUT /api/contents/path/Name.ipynb
153 157 Save notebook at ``path/Name.ipynb``. Notebook structure is specified
154 158 in `content` key of JSON request body. If content is not specified,
155 159 create a new empty notebook.
156 160 """
157 161 model = self.get_json_body()
158 162 if model:
159 163 if model.get('copy_from'):
160 164 raise web.HTTPError(400, "Cannot copy with PUT, only POST")
161 165 if self.contents_manager.file_exists(path):
162 166 self._save(model, path)
163 167 else:
164 168 self._upload(model, path)
165 169 else:
166 170 self._new_untitled(path)
167 171
168 172 @web.authenticated
169 173 @json_errors
170 174 def delete(self, path=''):
171 175 """delete a file in the given path"""
172 176 cm = self.contents_manager
173 177 self.log.warn('delete %s', path)
174 178 cm.delete(path)
175 179 self.set_status(204)
176 180 self.finish()
177 181
178 182
179 183 class CheckpointsHandler(IPythonHandler):
180 184
181 185 SUPPORTED_METHODS = ('GET', 'POST')
182 186
183 187 @web.authenticated
184 188 @json_errors
185 189 def get(self, path=''):
186 190 """get lists checkpoints for a file"""
187 191 cm = self.contents_manager
188 192 checkpoints = cm.list_checkpoints(path)
189 193 data = json.dumps(checkpoints, default=date_default)
190 194 self.finish(data)
191 195
192 196 @web.authenticated
193 197 @json_errors
194 198 def post(self, path=''):
195 199 """post creates a new checkpoint"""
196 200 cm = self.contents_manager
197 201 checkpoint = cm.create_checkpoint(path)
198 202 data = json.dumps(checkpoint, default=date_default)
199 203 location = url_path_join(self.base_url, 'api/contents',
200 204 path, 'checkpoints', checkpoint['id'])
201 205 self.set_header('Location', url_escape(location))
202 206 self.set_status(201)
203 207 self.finish(data)
204 208
205 209
206 210 class ModifyCheckpointsHandler(IPythonHandler):
207 211
208 212 SUPPORTED_METHODS = ('POST', 'DELETE')
209 213
210 214 @web.authenticated
211 215 @json_errors
212 216 def post(self, path, checkpoint_id):
213 217 """post restores a file from a checkpoint"""
214 218 cm = self.contents_manager
215 219 cm.restore_checkpoint(checkpoint_id, path)
216 220 self.set_status(204)
217 221 self.finish()
218 222
219 223 @web.authenticated
220 224 @json_errors
221 225 def delete(self, path, checkpoint_id):
222 226 """delete clears a checkpoint for a given file"""
223 227 cm = self.contents_manager
224 228 cm.delete_checkpoint(checkpoint_id, path)
225 229 self.set_status(204)
226 230 self.finish()
227 231
228 232
229 233 class NotebooksRedirectHandler(IPythonHandler):
230 234 """Redirect /api/notebooks to /api/contents"""
231 235 SUPPORTED_METHODS = ('GET', 'PUT', 'PATCH', 'POST', 'DELETE')
232 236
233 237 def get(self, path):
234 238 self.log.warn("/api/notebooks is deprecated, use /api/contents")
235 239 self.redirect(url_path_join(
236 240 self.base_url,
237 241 'api/contents',
238 242 path
239 243 ))
240 244
241 245 put = patch = post = delete = get
242 246
243 247
244 248 #-----------------------------------------------------------------------------
245 249 # URL to handler mappings
246 250 #-----------------------------------------------------------------------------
247 251
248 252
249 253 _checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
250 254
251 255 default_handlers = [
252 256 (r"/api/contents%s/checkpoints" % path_regex, CheckpointsHandler),
253 257 (r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
254 258 ModifyCheckpointsHandler),
255 259 (r"/api/contents%s" % path_regex, ContentsHandler),
256 260 (r"/api/notebooks/?(.*)", NotebooksRedirectHandler),
257 261 ]
@@ -1,373 +1,373 b''
1 1 """A base class for contents managers."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from fnmatch import fnmatch
7 7 import itertools
8 8 import json
9 9 import os
10 10
11 11 from tornado.web import HTTPError
12 12
13 13 from IPython.config.configurable import LoggingConfigurable
14 14 from IPython.nbformat import sign, validate, ValidationError
15 15 from IPython.nbformat.v4 import new_notebook
16 16 from IPython.utils.traitlets import Instance, Unicode, List
17 17
18 18
19 19 class ContentsManager(LoggingConfigurable):
20 20 """Base class for serving files and directories.
21 21
22 22 This serves any text or binary file,
23 23 as well as directories,
24 24 with special handling for JSON notebook documents.
25 25
26 26 Most APIs take a path argument,
27 27 which is always an API-style unicode path,
28 28 and always refers to a directory.
29 29
30 30 - unicode, not url-escaped
31 31 - '/'-separated
32 32 - leading and trailing '/' will be stripped
33 33 - if unspecified, path defaults to '',
34 34 indicating the root path.
35 35
36 36 """
37 37
38 38 notary = Instance(sign.NotebookNotary)
39 39 def _notary_default(self):
40 40 return sign.NotebookNotary(parent=self)
41 41
42 42 hide_globs = List(Unicode, [
43 43 u'__pycache__', '*.pyc', '*.pyo',
44 44 '.DS_Store', '*.so', '*.dylib', '*~',
45 45 ], config=True, help="""
46 46 Glob patterns to hide in file and directory listings.
47 47 """)
48 48
49 49 untitled_notebook = Unicode("Untitled", config=True,
50 50 help="The base name used when creating untitled notebooks."
51 51 )
52 52
53 53 untitled_file = Unicode("untitled", config=True,
54 54 help="The base name used when creating untitled files."
55 55 )
56 56
57 57 untitled_directory = Unicode("Untitled Folder", config=True,
58 58 help="The base name used when creating untitled directories."
59 59 )
60 60
61 61 # ContentsManager API part 1: methods that must be
62 62 # implemented in subclasses.
63 63
64 64 def dir_exists(self, path):
65 65 """Does the API-style path (directory) actually exist?
66 66
67 67 Like os.path.isdir
68 68
69 69 Override this method in subclasses.
70 70
71 71 Parameters
72 72 ----------
73 73 path : string
74 74 The path to check
75 75
76 76 Returns
77 77 -------
78 78 exists : bool
79 79 Whether the path does indeed exist.
80 80 """
81 81 raise NotImplementedError
82 82
83 83 def is_hidden(self, path):
84 84 """Does the API style path correspond to a hidden directory or file?
85 85
86 86 Parameters
87 87 ----------
88 88 path : string
89 89 The path to check. This is an API path (`/` separated,
90 90 relative to root dir).
91 91
92 92 Returns
93 93 -------
94 94 hidden : bool
95 95 Whether the path is hidden.
96 96
97 97 """
98 98 raise NotImplementedError
99 99
100 100 def file_exists(self, path=''):
101 101 """Does a file exist at the given path?
102 102
103 103 Like os.path.isfile
104 104
105 105 Override this method in subclasses.
106 106
107 107 Parameters
108 108 ----------
109 109 name : string
110 110 The name of the file you are checking.
111 111 path : string
112 112 The relative path to the file's directory (with '/' as separator)
113 113
114 114 Returns
115 115 -------
116 116 exists : bool
117 117 Whether the file exists.
118 118 """
119 119 raise NotImplementedError('must be implemented in a subclass')
120 120
121 121 def exists(self, path):
122 122 """Does a file or directory exist at the given path?
123 123
124 124 Like os.path.exists
125 125
126 126 Parameters
127 127 ----------
128 128 path : string
129 129 The relative path to the file's directory (with '/' as separator)
130 130
131 131 Returns
132 132 -------
133 133 exists : bool
134 134 Whether the target exists.
135 135 """
136 136 return self.file_exists(path) or self.dir_exists(path)
137 137
138 def get_model(self, path, content=True):
138 def get_model(self, path, content=True, type_=None):
139 139 """Get the model of a file or directory with or without content."""
140 140 raise NotImplementedError('must be implemented in a subclass')
141 141
142 142 def save(self, model, path):
143 143 """Save the file or directory and return the model with no content."""
144 144 raise NotImplementedError('must be implemented in a subclass')
145 145
146 146 def update(self, model, path):
147 147 """Update the file or directory and return the model with no content.
148 148
149 149 For use in PATCH requests, to enable renaming a file without
150 150 re-uploading its contents. Only used for renaming at the moment.
151 151 """
152 152 raise NotImplementedError('must be implemented in a subclass')
153 153
154 154 def delete(self, path):
155 155 """Delete file or directory by path."""
156 156 raise NotImplementedError('must be implemented in a subclass')
157 157
158 158 def create_checkpoint(self, path):
159 159 """Create a checkpoint of the current state of a file
160 160
161 161 Returns a checkpoint_id for the new checkpoint.
162 162 """
163 163 raise NotImplementedError("must be implemented in a subclass")
164 164
165 165 def list_checkpoints(self, path):
166 166 """Return a list of checkpoints for a given file"""
167 167 return []
168 168
169 169 def restore_checkpoint(self, checkpoint_id, path):
170 170 """Restore a file from one of its checkpoints"""
171 171 raise NotImplementedError("must be implemented in a subclass")
172 172
173 173 def delete_checkpoint(self, checkpoint_id, path):
174 174 """delete a checkpoint for a file"""
175 175 raise NotImplementedError("must be implemented in a subclass")
176 176
177 177 # ContentsManager API part 2: methods that have useable default
178 178 # implementations, but can be overridden in subclasses.
179 179
180 180 def info_string(self):
181 181 return "Serving contents"
182 182
183 183 def get_kernel_path(self, path, model=None):
184 184 """Return the API path for the kernel
185 185
186 186 KernelManagers can turn this value into a filesystem path,
187 187 or ignore it altogether.
188 188 """
189 189 return path
190 190
191 191 def increment_filename(self, filename, path=''):
192 192 """Increment a filename until it is unique.
193 193
194 194 Parameters
195 195 ----------
196 196 filename : unicode
197 197 The name of a file, including extension
198 198 path : unicode
199 199 The API path of the target's directory
200 200
201 201 Returns
202 202 -------
203 203 name : unicode
204 204 A filename that is unique, based on the input filename.
205 205 """
206 206 path = path.strip('/')
207 207 basename, ext = os.path.splitext(filename)
208 208 for i in itertools.count():
209 209 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
210 210 ext=ext)
211 211 if not self.exists(u'{}/{}'.format(path, name)):
212 212 break
213 213 return name
214 214
215 215 def validate_notebook_model(self, model):
216 216 """Add failed-validation message to model"""
217 217 try:
218 218 validate(model['content'])
219 219 except ValidationError as e:
220 220 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
221 221 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
222 222 )
223 223 return model
224 224
225 225 def new_untitled(self, path='', type='', ext=''):
226 226 """Create a new untitled file or directory in path
227 227
228 228 path must be a directory
229 229
230 230 File extension can be specified.
231 231
232 232 Use `new` to create files with a fully specified path (including filename).
233 233 """
234 234 path = path.strip('/')
235 235 if not self.dir_exists(path):
236 236 raise HTTPError(404, 'No such directory: %s' % path)
237 237
238 238 model = {}
239 239 if type:
240 240 model['type'] = type
241 241
242 242 if ext == '.ipynb':
243 243 model.setdefault('type', 'notebook')
244 244 else:
245 245 model.setdefault('type', 'file')
246 246
247 247 if model['type'] == 'directory':
248 248 untitled = self.untitled_directory
249 249 elif model['type'] == 'notebook':
250 250 untitled = self.untitled_notebook
251 251 ext = '.ipynb'
252 252 elif model['type'] == 'file':
253 253 untitled = self.untitled_file
254 254 else:
255 255 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
256 256
257 257 name = self.increment_filename(untitled + ext, path)
258 258 path = u'{0}/{1}'.format(path, name)
259 259 return self.new(model, path)
260 260
261 261 def new(self, model=None, path=''):
262 262 """Create a new file or directory and return its model with no content.
263 263
264 264 To create a new untitled entity in a directory, use `new_untitled`.
265 265 """
266 266 path = path.strip('/')
267 267 if model is None:
268 268 model = {}
269 269
270 270 if path.endswith('.ipynb'):
271 271 model.setdefault('type', 'notebook')
272 272 else:
273 273 model.setdefault('type', 'file')
274 274
275 275 # no content, not a directory, so fill out new-file model
276 276 if 'content' not in model and model['type'] != 'directory':
277 277 if model['type'] == 'notebook':
278 278 model['content'] = new_notebook()
279 279 model['format'] = 'json'
280 280 else:
281 281 model['content'] = ''
282 282 model['type'] = 'file'
283 283 model['format'] = 'text'
284 284
285 285 model = self.save(model, path)
286 286 return model
287 287
288 288 def copy(self, from_path, to_path=None):
289 289 """Copy an existing file and return its new model.
290 290
291 291 If to_path not specified, it will be the parent directory of from_path.
292 292 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
293 293
294 294 from_path must be a full path to a file.
295 295 """
296 296 path = from_path.strip('/')
297 297 if '/' in path:
298 298 from_dir, from_name = path.rsplit('/', 1)
299 299 else:
300 300 from_dir = ''
301 301 from_name = path
302 302
303 303 model = self.get_model(path)
304 304 model.pop('path', None)
305 305 model.pop('name', None)
306 306 if model['type'] == 'directory':
307 307 raise HTTPError(400, "Can't copy directories")
308 308
309 309 if not to_path:
310 310 to_path = from_dir
311 311 if self.dir_exists(to_path):
312 312 base, ext = os.path.splitext(from_name)
313 313 copy_name = u'{0}-Copy{1}'.format(base, ext)
314 314 to_name = self.increment_filename(copy_name, to_path)
315 315 to_path = u'{0}/{1}'.format(to_path, to_name)
316 316
317 317 model = self.save(model, to_path)
318 318 return model
319 319
320 320 def log_info(self):
321 321 self.log.info(self.info_string())
322 322
323 323 def trust_notebook(self, path):
324 324 """Explicitly trust a notebook
325 325
326 326 Parameters
327 327 ----------
328 328 path : string
329 329 The path of a notebook
330 330 """
331 331 model = self.get_model(path)
332 332 nb = model['content']
333 333 self.log.warn("Trusting notebook %s", path)
334 334 self.notary.mark_cells(nb, True)
335 335 self.save(model, path)
336 336
337 337 def check_and_sign(self, nb, path=''):
338 338 """Check for trusted cells, and sign the notebook.
339 339
340 340 Called as a part of saving notebooks.
341 341
342 342 Parameters
343 343 ----------
344 344 nb : dict
345 345 The notebook dict
346 346 path : string
347 347 The notebook's path (for logging)
348 348 """
349 349 if self.notary.check_cells(nb):
350 350 self.notary.sign(nb)
351 351 else:
352 352 self.log.warn("Saving untrusted notebook %s", path)
353 353
354 354 def mark_trusted_cells(self, nb, path=''):
355 355 """Mark cells as trusted if the notebook signature matches.
356 356
357 357 Called as a part of loading notebooks.
358 358
359 359 Parameters
360 360 ----------
361 361 nb : dict
362 362 The notebook object (in current nbformat)
363 363 path : string
364 364 The notebook's path (for logging)
365 365 """
366 366 trusted = self.notary.check_signature(nb)
367 367 if not trusted:
368 368 self.log.warn("Notebook %s is not trusted", path)
369 369 self.notary.mark_cells(nb, trusted)
370 370
371 371 def should_list(self, name):
372 372 """Should this file/directory name be displayed in a listing?"""
373 373 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,495 +1,504 b''
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 import io
6 6 import json
7 7 import os
8 8 import shutil
9 9 from unicodedata import normalize
10 10
11 11 pjoin = os.path.join
12 12
13 13 import requests
14 14
15 15 from IPython.html.utils import url_path_join, url_escape
16 16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 17 from IPython.nbformat import read, write, from_dict
18 18 from IPython.nbformat.v4 import (
19 19 new_notebook, new_markdown_cell,
20 20 )
21 21 from IPython.nbformat import v2
22 22 from IPython.utils import py3compat
23 23 from IPython.utils.data import uniq_stable
24 24
25 25
26 26 def notebooks_only(dir_model):
27 27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28 28
29 29 def dirs_only(dir_model):
30 30 return [x for x in dir_model['content'] if x['type']=='directory']
31 31
32 32
33 33 class API(object):
34 34 """Wrapper for contents API calls."""
35 35 def __init__(self, base_url):
36 36 self.base_url = base_url
37 37
38 38 def _req(self, verb, path, body=None):
39 39 response = requests.request(verb,
40 40 url_path_join(self.base_url, 'api/contents', path),
41 41 data=body,
42 42 )
43 43 response.raise_for_status()
44 44 return response
45 45
46 46 def list(self, path='/'):
47 47 return self._req('GET', path)
48 48
49 def read(self, path):
49 def read(self, path, type_=None):
50 if type_ is not None:
51 path += '?type=' + type_
50 52 return self._req('GET', path)
51 53
52 54 def create_untitled(self, path='/', ext='.ipynb'):
53 55 body = None
54 56 if ext:
55 57 body = json.dumps({'ext': ext})
56 58 return self._req('POST', path, body)
57 59
58 60 def mkdir_untitled(self, path='/'):
59 61 return self._req('POST', path, json.dumps({'type': 'directory'}))
60 62
61 63 def copy(self, copy_from, path='/'):
62 64 body = json.dumps({'copy_from':copy_from})
63 65 return self._req('POST', path, body)
64 66
65 67 def create(self, path='/'):
66 68 return self._req('PUT', path)
67 69
68 70 def upload(self, path, body):
69 71 return self._req('PUT', path, body)
70 72
71 73 def mkdir_untitled(self, path='/'):
72 74 return self._req('POST', path, json.dumps({'type': 'directory'}))
73 75
74 76 def mkdir(self, path='/'):
75 77 return self._req('PUT', path, json.dumps({'type': 'directory'}))
76 78
77 79 def copy_put(self, copy_from, path='/'):
78 80 body = json.dumps({'copy_from':copy_from})
79 81 return self._req('PUT', path, body)
80 82
81 83 def save(self, path, body):
82 84 return self._req('PUT', path, body)
83 85
84 86 def delete(self, path='/'):
85 87 return self._req('DELETE', path)
86 88
87 89 def rename(self, path, new_path):
88 90 body = json.dumps({'path': new_path})
89 91 return self._req('PATCH', path, body)
90 92
91 93 def get_checkpoints(self, path):
92 94 return self._req('GET', url_path_join(path, 'checkpoints'))
93 95
94 96 def new_checkpoint(self, path):
95 97 return self._req('POST', url_path_join(path, 'checkpoints'))
96 98
97 99 def restore_checkpoint(self, path, checkpoint_id):
98 100 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
99 101
100 102 def delete_checkpoint(self, path, checkpoint_id):
101 103 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
102 104
103 105 class APITest(NotebookTestBase):
104 106 """Test the kernels web service API"""
105 107 dirs_nbs = [('', 'inroot'),
106 108 ('Directory with spaces in', 'inspace'),
107 109 (u'unicodΓ©', 'innonascii'),
108 110 ('foo', 'a'),
109 111 ('foo', 'b'),
110 112 ('foo', 'name with spaces'),
111 113 ('foo', u'unicodΓ©'),
112 114 ('foo/bar', 'baz'),
113 115 ('ordering', 'A'),
114 116 ('ordering', 'b'),
115 117 ('ordering', 'C'),
116 118 (u'Γ₯ b', u'Γ§ d'),
117 119 ]
118 120 hidden_dirs = ['.hidden', '__pycache__']
119 121
120 122 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
121 123 del dirs[0] # remove ''
122 124 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
123 125
124 126 @staticmethod
125 127 def _blob_for_name(name):
126 128 return name.encode('utf-8') + b'\xFF'
127 129
128 130 @staticmethod
129 131 def _txt_for_name(name):
130 132 return u'%s text file' % name
131 133
132 134 def setUp(self):
133 135 nbdir = self.notebook_dir.name
134 136 self.blob = os.urandom(100)
135 137 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
136 138
137 139 for d in (self.dirs + self.hidden_dirs):
138 140 d.replace('/', os.sep)
139 141 if not os.path.isdir(pjoin(nbdir, d)):
140 142 os.mkdir(pjoin(nbdir, d))
141 143
142 144 for d, name in self.dirs_nbs:
143 145 d = d.replace('/', os.sep)
144 146 # create a notebook
145 147 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
146 148 encoding='utf-8') as f:
147 149 nb = new_notebook()
148 150 write(nb, f, version=4)
149 151
150 152 # create a text file
151 153 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
152 154 encoding='utf-8') as f:
153 155 f.write(self._txt_for_name(name))
154 156
155 157 # create a binary file
156 158 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
157 159 f.write(self._blob_for_name(name))
158 160
159 161 self.api = API(self.base_url())
160 162
161 163 def tearDown(self):
162 164 nbdir = self.notebook_dir.name
163 165
164 166 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
165 167 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
166 168
167 169 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
168 170 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
169 171
170 172 def test_list_notebooks(self):
171 173 nbs = notebooks_only(self.api.list().json())
172 174 self.assertEqual(len(nbs), 1)
173 175 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
174 176
175 177 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
176 178 self.assertEqual(len(nbs), 1)
177 179 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
178 180
179 181 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
180 182 self.assertEqual(len(nbs), 1)
181 183 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
182 184 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
183 185
184 186 nbs = notebooks_only(self.api.list('/foo/bar/').json())
185 187 self.assertEqual(len(nbs), 1)
186 188 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
187 189 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
188 190
189 191 nbs = notebooks_only(self.api.list('foo').json())
190 192 self.assertEqual(len(nbs), 4)
191 193 nbnames = { normalize('NFC', n['name']) for n in nbs }
192 194 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
193 195 expected = { normalize('NFC', name) for name in expected }
194 196 self.assertEqual(nbnames, expected)
195 197
196 198 nbs = notebooks_only(self.api.list('ordering').json())
197 199 nbnames = [n['name'] for n in nbs]
198 200 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
199 201 self.assertEqual(nbnames, expected)
200 202
201 203 def test_list_dirs(self):
202 204 print(self.api.list().json())
203 205 dirs = dirs_only(self.api.list().json())
204 206 dir_names = {normalize('NFC', d['name']) for d in dirs}
205 207 print(dir_names)
206 208 print(self.top_level_dirs)
207 209 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
208 210
209 211 def test_list_nonexistant_dir(self):
210 212 with assert_http_error(404):
211 213 self.api.list('nonexistant')
212 214
213 215 def test_get_nb_contents(self):
214 216 for d, name in self.dirs_nbs:
215 217 path = url_path_join(d, name + '.ipynb')
216 218 nb = self.api.read(path).json()
217 219 self.assertEqual(nb['name'], u'%s.ipynb' % name)
218 220 self.assertEqual(nb['path'], path)
219 221 self.assertEqual(nb['type'], 'notebook')
220 222 self.assertIn('content', nb)
221 223 self.assertEqual(nb['format'], 'json')
222 224 self.assertIn('content', nb)
223 225 self.assertIn('metadata', nb['content'])
224 226 self.assertIsInstance(nb['content']['metadata'], dict)
225 227
226 228 def test_get_contents_no_such_file(self):
227 229 # Name that doesn't exist - should be a 404
228 230 with assert_http_error(404):
229 231 self.api.read('foo/q.ipynb')
230 232
231 233 def test_get_text_file_contents(self):
232 234 for d, name in self.dirs_nbs:
233 235 path = url_path_join(d, name + '.txt')
234 236 model = self.api.read(path).json()
235 237 self.assertEqual(model['name'], u'%s.txt' % name)
236 238 self.assertEqual(model['path'], path)
237 239 self.assertIn('content', model)
238 240 self.assertEqual(model['format'], 'text')
239 241 self.assertEqual(model['type'], 'file')
240 242 self.assertEqual(model['content'], self._txt_for_name(name))
241 243
242 244 # Name that doesn't exist - should be a 404
243 245 with assert_http_error(404):
244 246 self.api.read('foo/q.txt')
245 247
246 248 def test_get_binary_file_contents(self):
247 249 for d, name in self.dirs_nbs:
248 250 path = url_path_join(d, name + '.blob')
249 251 model = self.api.read(path).json()
250 252 self.assertEqual(model['name'], u'%s.blob' % name)
251 253 self.assertEqual(model['path'], path)
252 254 self.assertIn('content', model)
253 255 self.assertEqual(model['format'], 'base64')
254 256 self.assertEqual(model['type'], 'file')
255 257 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
256 258 self.assertEqual(model['content'], b64_data)
257 259
258 260 # Name that doesn't exist - should be a 404
259 261 with assert_http_error(404):
260 262 self.api.read('foo/q.txt')
261 263
264 def test_get_bad_type(self):
265 with assert_http_error(400):
266 self.api.read(u'unicodΓ©', type_='file') # this is a directory
267
268 with assert_http_error(400):
269 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
270
262 271 def _check_created(self, resp, path, type='notebook'):
263 272 self.assertEqual(resp.status_code, 201)
264 273 location_header = py3compat.str_to_unicode(resp.headers['Location'])
265 274 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
266 275 rjson = resp.json()
267 276 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
268 277 self.assertEqual(rjson['path'], path)
269 278 self.assertEqual(rjson['type'], type)
270 279 isright = os.path.isdir if type == 'directory' else os.path.isfile
271 280 assert isright(pjoin(
272 281 self.notebook_dir.name,
273 282 path.replace('/', os.sep),
274 283 ))
275 284
276 285 def test_create_untitled(self):
277 286 resp = self.api.create_untitled(path=u'Γ₯ b')
278 287 self._check_created(resp, u'Γ₯ b/Untitled0.ipynb')
279 288
280 289 # Second time
281 290 resp = self.api.create_untitled(path=u'Γ₯ b')
282 291 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
283 292
284 293 # And two directories down
285 294 resp = self.api.create_untitled(path='foo/bar')
286 295 self._check_created(resp, 'foo/bar/Untitled0.ipynb')
287 296
288 297 def test_create_untitled_txt(self):
289 298 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
290 299 self._check_created(resp, 'foo/bar/untitled0.txt', type='file')
291 300
292 301 resp = self.api.read(path='foo/bar/untitled0.txt')
293 302 model = resp.json()
294 303 self.assertEqual(model['type'], 'file')
295 304 self.assertEqual(model['format'], 'text')
296 305 self.assertEqual(model['content'], '')
297 306
298 307 def test_upload(self):
299 308 nb = new_notebook()
300 309 nbmodel = {'content': nb, 'type': 'notebook'}
301 310 path = u'Γ₯ b/Upload tΓ©st.ipynb'
302 311 resp = self.api.upload(path, body=json.dumps(nbmodel))
303 312 self._check_created(resp, path)
304 313
305 314 def test_mkdir_untitled(self):
306 315 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
307 316 self._check_created(resp, u'Γ₯ b/Untitled Folder0', type='directory')
308 317
309 318 # Second time
310 319 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
311 320 self._check_created(resp, u'Γ₯ b/Untitled Folder1', type='directory')
312 321
313 322 # And two directories down
314 323 resp = self.api.mkdir_untitled(path='foo/bar')
315 324 self._check_created(resp, 'foo/bar/Untitled Folder0', type='directory')
316 325
317 326 def test_mkdir(self):
318 327 path = u'Γ₯ b/New βˆ‚ir'
319 328 resp = self.api.mkdir(path)
320 329 self._check_created(resp, path, type='directory')
321 330
322 331 def test_mkdir_hidden_400(self):
323 332 with assert_http_error(400):
324 333 resp = self.api.mkdir(u'Γ₯ b/.hidden')
325 334
326 335 def test_upload_txt(self):
327 336 body = u'ΓΌnicode tΓ©xt'
328 337 model = {
329 338 'content' : body,
330 339 'format' : 'text',
331 340 'type' : 'file',
332 341 }
333 342 path = u'Γ₯ b/Upload tΓ©st.txt'
334 343 resp = self.api.upload(path, body=json.dumps(model))
335 344
336 345 # check roundtrip
337 346 resp = self.api.read(path)
338 347 model = resp.json()
339 348 self.assertEqual(model['type'], 'file')
340 349 self.assertEqual(model['format'], 'text')
341 350 self.assertEqual(model['content'], body)
342 351
343 352 def test_upload_b64(self):
344 353 body = b'\xFFblob'
345 354 b64body = base64.encodestring(body).decode('ascii')
346 355 model = {
347 356 'content' : b64body,
348 357 'format' : 'base64',
349 358 'type' : 'file',
350 359 }
351 360 path = u'Γ₯ b/Upload tΓ©st.blob'
352 361 resp = self.api.upload(path, body=json.dumps(model))
353 362
354 363 # check roundtrip
355 364 resp = self.api.read(path)
356 365 model = resp.json()
357 366 self.assertEqual(model['type'], 'file')
358 367 self.assertEqual(model['path'], path)
359 368 self.assertEqual(model['format'], 'base64')
360 369 decoded = base64.decodestring(model['content'].encode('ascii'))
361 370 self.assertEqual(decoded, body)
362 371
363 372 def test_upload_v2(self):
364 373 nb = v2.new_notebook()
365 374 ws = v2.new_worksheet()
366 375 nb.worksheets.append(ws)
367 376 ws.cells.append(v2.new_code_cell(input='print("hi")'))
368 377 nbmodel = {'content': nb, 'type': 'notebook'}
369 378 path = u'Γ₯ b/Upload tΓ©st.ipynb'
370 379 resp = self.api.upload(path, body=json.dumps(nbmodel))
371 380 self._check_created(resp, path)
372 381 resp = self.api.read(path)
373 382 data = resp.json()
374 383 self.assertEqual(data['content']['nbformat'], 4)
375 384
376 385 def test_copy(self):
377 386 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'unicodΓ©')
378 387 self._check_created(resp, u'unicodΓ©/Γ§ d-Copy0.ipynb')
379 388
380 389 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
381 390 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy0.ipynb')
382 391
383 392 def test_copy_path(self):
384 393 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
385 394 self._check_created(resp, u'Γ₯ b/a-Copy0.ipynb')
386 395
387 396 def test_copy_put_400(self):
388 397 with assert_http_error(400):
389 398 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
390 399
391 400 def test_copy_dir_400(self):
392 401 # can't copy directories
393 402 with assert_http_error(400):
394 403 resp = self.api.copy(u'Γ₯ b', u'foo')
395 404
396 405 def test_delete(self):
397 406 for d, name in self.dirs_nbs:
398 407 print('%r, %r' % (d, name))
399 408 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
400 409 self.assertEqual(resp.status_code, 204)
401 410
402 411 for d in self.dirs + ['/']:
403 412 nbs = notebooks_only(self.api.list(d).json())
404 413 print('------')
405 414 print(d)
406 415 print(nbs)
407 416 self.assertEqual(nbs, [])
408 417
409 418 def test_delete_dirs(self):
410 419 # depth-first delete everything, so we don't try to delete empty directories
411 420 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
412 421 listing = self.api.list(name).json()['content']
413 422 for model in listing:
414 423 self.api.delete(model['path'])
415 424 listing = self.api.list('/').json()['content']
416 425 self.assertEqual(listing, [])
417 426
418 427 def test_delete_non_empty_dir(self):
419 428 """delete non-empty dir raises 400"""
420 429 with assert_http_error(400):
421 430 self.api.delete(u'Γ₯ b')
422 431
423 432 def test_rename(self):
424 433 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
425 434 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
426 435 self.assertEqual(resp.json()['name'], 'z.ipynb')
427 436 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
428 437 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
429 438
430 439 nbs = notebooks_only(self.api.list('foo').json())
431 440 nbnames = set(n['name'] for n in nbs)
432 441 self.assertIn('z.ipynb', nbnames)
433 442 self.assertNotIn('a.ipynb', nbnames)
434 443
435 444 def test_rename_existing(self):
436 445 with assert_http_error(409):
437 446 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
438 447
439 448 def test_save(self):
440 449 resp = self.api.read('foo/a.ipynb')
441 450 nbcontent = json.loads(resp.text)['content']
442 451 nb = from_dict(nbcontent)
443 452 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
444 453
445 454 nbmodel= {'content': nb, 'type': 'notebook'}
446 455 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
447 456
448 457 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
449 458 with io.open(nbfile, 'r', encoding='utf-8') as f:
450 459 newnb = read(f, as_version=4)
451 460 self.assertEqual(newnb.cells[0].source,
452 461 u'Created by test Β³')
453 462 nbcontent = self.api.read('foo/a.ipynb').json()['content']
454 463 newnb = from_dict(nbcontent)
455 464 self.assertEqual(newnb.cells[0].source,
456 465 u'Created by test Β³')
457 466
458 467
459 468 def test_checkpoints(self):
460 469 resp = self.api.read('foo/a.ipynb')
461 470 r = self.api.new_checkpoint('foo/a.ipynb')
462 471 self.assertEqual(r.status_code, 201)
463 472 cp1 = r.json()
464 473 self.assertEqual(set(cp1), {'id', 'last_modified'})
465 474 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
466 475
467 476 # Modify it
468 477 nbcontent = json.loads(resp.text)['content']
469 478 nb = from_dict(nbcontent)
470 479 hcell = new_markdown_cell('Created by test')
471 480 nb.cells.append(hcell)
472 481 # Save
473 482 nbmodel= {'content': nb, 'type': 'notebook'}
474 483 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
475 484
476 485 # List checkpoints
477 486 cps = self.api.get_checkpoints('foo/a.ipynb').json()
478 487 self.assertEqual(cps, [cp1])
479 488
480 489 nbcontent = self.api.read('foo/a.ipynb').json()['content']
481 490 nb = from_dict(nbcontent)
482 491 self.assertEqual(nb.cells[0].source, 'Created by test')
483 492
484 493 # Restore cp1
485 494 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
486 495 self.assertEqual(r.status_code, 204)
487 496 nbcontent = self.api.read('foo/a.ipynb').json()['content']
488 497 nb = from_dict(nbcontent)
489 498 self.assertEqual(nb.cells, [])
490 499
491 500 # Delete cp1
492 501 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
493 502 self.assertEqual(r.status_code, 204)
494 503 cps = self.api.get_checkpoints('foo/a.ipynb').json()
495 504 self.assertEqual(cps, [])
@@ -1,348 +1,362 b''
1 1 # coding: utf-8
2 2 """Tests for the notebook manager."""
3 3 from __future__ import print_function
4 4
5 5 import logging
6 6 import os
7 7
8 8 from tornado.web import HTTPError
9 9 from unittest import TestCase
10 10 from tempfile import NamedTemporaryFile
11 11
12 12 from IPython.nbformat import v4 as nbformat
13 13
14 14 from IPython.utils.tempdir import TemporaryDirectory
15 15 from IPython.utils.traitlets import TraitError
16 16 from IPython.html.utils import url_path_join
17 17 from IPython.testing import decorators as dec
18 18
19 19 from ..filemanager import FileContentsManager
20 20 from ..manager import ContentsManager
21 21
22 22
23 23 class TestFileContentsManager(TestCase):
24 24
25 25 def test_root_dir(self):
26 26 with TemporaryDirectory() as td:
27 27 fm = FileContentsManager(root_dir=td)
28 28 self.assertEqual(fm.root_dir, td)
29 29
30 30 def test_missing_root_dir(self):
31 31 with TemporaryDirectory() as td:
32 32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34 34
35 35 def test_invalid_root_dir(self):
36 36 with NamedTemporaryFile() as tf:
37 37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38 38
39 39 def test_get_os_path(self):
40 40 # full filesystem path should be returned with correct operating system
41 41 # separators.
42 42 with TemporaryDirectory() as td:
43 43 root = td
44 44 fm = FileContentsManager(root_dir=root)
45 45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
46 46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 48 self.assertEqual(path, fs_path)
49 49
50 50 fm = FileContentsManager(root_dir=root)
51 51 path = fm._get_os_path('test.ipynb')
52 52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 53 self.assertEqual(path, fs_path)
54 54
55 55 fm = FileContentsManager(root_dir=root)
56 56 path = fm._get_os_path('////test.ipynb')
57 57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 58 self.assertEqual(path, fs_path)
59 59
60 60 def test_checkpoint_subdir(self):
61 61 subd = u'sub βˆ‚ir'
62 62 cp_name = 'test-cp.ipynb'
63 63 with TemporaryDirectory() as td:
64 64 root = td
65 65 os.mkdir(os.path.join(td, subd))
66 66 fm = FileContentsManager(root_dir=root)
67 67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
68 68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
69 69 self.assertNotEqual(cp_dir, cp_subdir)
70 70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72 72
73 73
74 74 class TestContentsManager(TestCase):
75 75
76 76 def setUp(self):
77 77 self._temp_dir = TemporaryDirectory()
78 78 self.td = self._temp_dir.name
79 79 self.contents_manager = FileContentsManager(
80 80 root_dir=self.td,
81 81 log=logging.getLogger()
82 82 )
83 83
84 84 def tearDown(self):
85 85 self._temp_dir.cleanup()
86 86
87 87 def make_dir(self, abs_path, rel_path):
88 88 """make subdirectory, rel_path is the relative path
89 89 to that directory from the location where the server started"""
90 90 os_path = os.path.join(abs_path, rel_path)
91 91 try:
92 92 os.makedirs(os_path)
93 93 except OSError:
94 94 print("Directory already exists: %r" % os_path)
95 95 return os_path
96 96
97 97 def add_code_cell(self, nb):
98 98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 100 nb.cells.append(cell)
101 101
102 102 def new_notebook(self):
103 103 cm = self.contents_manager
104 104 model = cm.new_untitled(type='notebook')
105 105 name = model['name']
106 106 path = model['path']
107 107
108 108 full_model = cm.get_model(path)
109 109 nb = full_model['content']
110 110 self.add_code_cell(nb)
111 111
112 112 cm.save(full_model, path)
113 113 return nb, name, path
114 114
115 115 def test_new_untitled(self):
116 116 cm = self.contents_manager
117 117 # Test in root directory
118 118 model = cm.new_untitled(type='notebook')
119 119 assert isinstance(model, dict)
120 120 self.assertIn('name', model)
121 121 self.assertIn('path', model)
122 122 self.assertIn('type', model)
123 123 self.assertEqual(model['type'], 'notebook')
124 124 self.assertEqual(model['name'], 'Untitled0.ipynb')
125 125 self.assertEqual(model['path'], 'Untitled0.ipynb')
126 126
127 127 # Test in sub-directory
128 128 model = cm.new_untitled(type='directory')
129 129 assert isinstance(model, dict)
130 130 self.assertIn('name', model)
131 131 self.assertIn('path', model)
132 132 self.assertIn('type', model)
133 133 self.assertEqual(model['type'], 'directory')
134 134 self.assertEqual(model['name'], 'Untitled Folder0')
135 135 self.assertEqual(model['path'], 'Untitled Folder0')
136 136 sub_dir = model['path']
137 137
138 138 model = cm.new_untitled(path=sub_dir)
139 139 assert isinstance(model, dict)
140 140 self.assertIn('name', model)
141 141 self.assertIn('path', model)
142 142 self.assertIn('type', model)
143 143 self.assertEqual(model['type'], 'file')
144 144 self.assertEqual(model['name'], 'untitled0')
145 145 self.assertEqual(model['path'], '%s/untitled0' % sub_dir)
146 146
147 147 def test_get(self):
148 148 cm = self.contents_manager
149 149 # Create a notebook
150 150 model = cm.new_untitled(type='notebook')
151 151 name = model['name']
152 152 path = model['path']
153 153
154 154 # Check that we 'get' on the notebook we just created
155 155 model2 = cm.get_model(path)
156 156 assert isinstance(model2, dict)
157 157 self.assertIn('name', model2)
158 158 self.assertIn('path', model2)
159 159 self.assertEqual(model['name'], name)
160 160 self.assertEqual(model['path'], path)
161 161
162 nb_as_file = cm.get_model(path, content=True, type_='file')
163 self.assertEqual(nb_as_file['path'], path)
164 self.assertEqual(nb_as_file['type'], 'file')
165 self.assertEqual(nb_as_file['format'], 'text')
166 self.assertNotIsInstance(nb_as_file['content'], dict)
167
162 168 # Test in sub-directory
163 169 sub_dir = '/foo/'
164 170 self.make_dir(cm.root_dir, 'foo')
165 171 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
166 172 model2 = cm.get_model(sub_dir + name)
167 173 assert isinstance(model2, dict)
168 174 self.assertIn('name', model2)
169 175 self.assertIn('path', model2)
170 176 self.assertIn('content', model2)
171 177 self.assertEqual(model2['name'], 'Untitled0.ipynb')
172 178 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
173 179
180 # Test getting directory model
181 dirmodel = cm.get_model('foo')
182 self.assertEqual(dirmodel['type'], 'directory')
183
184 with self.assertRaises(HTTPError):
185 cm.get_model('foo', type_='file')
186
187
174 188 @dec.skip_win32
175 189 def test_bad_symlink(self):
176 190 cm = self.contents_manager
177 191 path = 'test bad symlink'
178 192 os_path = self.make_dir(cm.root_dir, path)
179 193
180 194 file_model = cm.new_untitled(path=path, ext='.txt')
181 195
182 196 # create a broken symlink
183 197 os.symlink("target", os.path.join(os_path, "bad symlink"))
184 198 model = cm.get_model(path)
185 199 self.assertEqual(model['content'], [file_model])
186 200
187 201 @dec.skip_win32
188 202 def test_good_symlink(self):
189 203 cm = self.contents_manager
190 204 parent = 'test good symlink'
191 205 name = 'good symlink'
192 206 path = '{0}/{1}'.format(parent, name)
193 207 os_path = self.make_dir(cm.root_dir, parent)
194 208
195 209 file_model = cm.new(path=parent + '/zfoo.txt')
196 210
197 211 # create a good symlink
198 212 os.symlink(file_model['name'], os.path.join(os_path, name))
199 213 symlink_model = cm.get_model(path, content=False)
200 214 dir_model = cm.get_model(parent)
201 215 self.assertEqual(
202 216 sorted(dir_model['content'], key=lambda x: x['name']),
203 217 [symlink_model, file_model],
204 218 )
205 219
206 220 def test_update(self):
207 221 cm = self.contents_manager
208 222 # Create a notebook
209 223 model = cm.new_untitled(type='notebook')
210 224 name = model['name']
211 225 path = model['path']
212 226
213 227 # Change the name in the model for rename
214 228 model['path'] = 'test.ipynb'
215 229 model = cm.update(model, path)
216 230 assert isinstance(model, dict)
217 231 self.assertIn('name', model)
218 232 self.assertIn('path', model)
219 233 self.assertEqual(model['name'], 'test.ipynb')
220 234
221 235 # Make sure the old name is gone
222 236 self.assertRaises(HTTPError, cm.get_model, path)
223 237
224 238 # Test in sub-directory
225 239 # Create a directory and notebook in that directory
226 240 sub_dir = '/foo/'
227 241 self.make_dir(cm.root_dir, 'foo')
228 242 model = cm.new_untitled(path=sub_dir, type='notebook')
229 243 name = model['name']
230 244 path = model['path']
231 245
232 246 # Change the name in the model for rename
233 247 d = path.rsplit('/', 1)[0]
234 248 new_path = model['path'] = d + '/test_in_sub.ipynb'
235 249 model = cm.update(model, path)
236 250 assert isinstance(model, dict)
237 251 self.assertIn('name', model)
238 252 self.assertIn('path', model)
239 253 self.assertEqual(model['name'], 'test_in_sub.ipynb')
240 254 self.assertEqual(model['path'], new_path)
241 255
242 256 # Make sure the old name is gone
243 257 self.assertRaises(HTTPError, cm.get_model, path)
244 258
245 259 def test_save(self):
246 260 cm = self.contents_manager
247 261 # Create a notebook
248 262 model = cm.new_untitled(type='notebook')
249 263 name = model['name']
250 264 path = model['path']
251 265
252 266 # Get the model with 'content'
253 267 full_model = cm.get_model(path)
254 268
255 269 # Save the notebook
256 270 model = cm.save(full_model, path)
257 271 assert isinstance(model, dict)
258 272 self.assertIn('name', model)
259 273 self.assertIn('path', model)
260 274 self.assertEqual(model['name'], name)
261 275 self.assertEqual(model['path'], path)
262 276
263 277 # Test in sub-directory
264 278 # Create a directory and notebook in that directory
265 279 sub_dir = '/foo/'
266 280 self.make_dir(cm.root_dir, 'foo')
267 281 model = cm.new_untitled(path=sub_dir, type='notebook')
268 282 name = model['name']
269 283 path = model['path']
270 284 model = cm.get_model(path)
271 285
272 286 # Change the name in the model for rename
273 287 model = cm.save(model, path)
274 288 assert isinstance(model, dict)
275 289 self.assertIn('name', model)
276 290 self.assertIn('path', model)
277 291 self.assertEqual(model['name'], 'Untitled0.ipynb')
278 292 self.assertEqual(model['path'], 'foo/Untitled0.ipynb')
279 293
280 294 def test_delete(self):
281 295 cm = self.contents_manager
282 296 # Create a notebook
283 297 nb, name, path = self.new_notebook()
284 298
285 299 # Delete the notebook
286 300 cm.delete(path)
287 301
288 302 # Check that a 'get' on the deleted notebook raises and error
289 303 self.assertRaises(HTTPError, cm.get_model, path)
290 304
291 305 def test_copy(self):
292 306 cm = self.contents_manager
293 307 parent = u'Γ₯ b'
294 308 name = u'nb √.ipynb'
295 309 path = u'{0}/{1}'.format(parent, name)
296 310 os.mkdir(os.path.join(cm.root_dir, parent))
297 311 orig = cm.new(path=path)
298 312
299 313 # copy with unspecified name
300 314 copy = cm.copy(path)
301 315 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
302 316
303 317 # copy with specified name
304 318 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
305 319 self.assertEqual(copy2['name'], u'copy 2.ipynb')
306 320 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
307 321
308 322 def test_trust_notebook(self):
309 323 cm = self.contents_manager
310 324 nb, name, path = self.new_notebook()
311 325
312 326 untrusted = cm.get_model(path)['content']
313 327 assert not cm.notary.check_cells(untrusted)
314 328
315 329 # print(untrusted)
316 330 cm.trust_notebook(path)
317 331 trusted = cm.get_model(path)['content']
318 332 # print(trusted)
319 333 assert cm.notary.check_cells(trusted)
320 334
321 335 def test_mark_trusted_cells(self):
322 336 cm = self.contents_manager
323 337 nb, name, path = self.new_notebook()
324 338
325 339 cm.mark_trusted_cells(nb, path)
326 340 for cell in nb.cells:
327 341 if cell.cell_type == 'code':
328 342 assert not cell.metadata.trusted
329 343
330 344 cm.trust_notebook(path)
331 345 nb = cm.get_model(path)['content']
332 346 for cell in nb.cells:
333 347 if cell.cell_type == 'code':
334 348 assert cell.metadata.trusted
335 349
336 350 def test_check_and_sign(self):
337 351 cm = self.contents_manager
338 352 nb, name, path = self.new_notebook()
339 353
340 354 cm.mark_trusted_cells(nb, path)
341 355 cm.check_and_sign(nb, path)
342 356 assert not cm.notary.check_signature(nb)
343 357
344 358 cm.trust_notebook(path)
345 359 nb = cm.get_model(path)['content']
346 360 cm.mark_trusted_cells(nb, path)
347 361 cm.check_and_sign(nb, path)
348 362 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now