##// END OF EJS Templates
Backport PR #5708: create checkpoints dir in notebook subdirectories...
Thomas Kluyver -
Show More
@@ -1,494 +1,482 b''
1 1 """A notebook manager that uses the local file system for storage.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 * Zach Sailer
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 import io
21 21 import os
22 22 import glob
23 23 import shutil
24 24
25 25 from tornado import web
26 26
27 27 from .nbmanager import NotebookManager
28 28 from IPython.nbformat import current
29 29 from IPython.utils.traitlets import Unicode, Bool, TraitError
30 30 from IPython.utils.py3compat import getcwd
31 31 from IPython.utils import tz
32 32 from IPython.html.utils import is_hidden, to_os_path
33 33
34 34 def sort_key(item):
35 35 """Case-insensitive sorting."""
36 36 return item['name'].lower()
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Classes
40 40 #-----------------------------------------------------------------------------
41 41
42 42 class FileNotebookManager(NotebookManager):
43 43
44 44 save_script = Bool(False, config=True,
45 45 help="""Automatically create a Python script when saving the notebook.
46 46
47 47 For easier use of import, %run and %load across notebooks, a
48 48 <notebook-name>.py script will be created next to any
49 49 <notebook-name>.ipynb on each save. This can also be set with the
50 50 short `--script` flag.
51 51 """
52 52 )
53 53 notebook_dir = Unicode(getcwd(), config=True)
54 54
55 55 def _notebook_dir_changed(self, name, old, new):
56 56 """Do a bit of validation of the notebook dir."""
57 57 if not os.path.isabs(new):
58 58 # If we receive a non-absolute path, make it absolute.
59 59 self.notebook_dir = os.path.abspath(new)
60 60 return
61 61 if not os.path.exists(new) or not os.path.isdir(new):
62 62 raise TraitError("notebook dir %r is not a directory" % new)
63
64 checkpoint_dir = Unicode(config=True,
65 help="""The location in which to keep notebook checkpoints
63
64 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
65 help="""The directory name in which to keep notebook checkpoints
66
67 This is a path relative to the notebook's own directory.
66 68
67 By default, it is notebook-dir/.ipynb_checkpoints
69 By default, it is .ipynb_checkpoints
68 70 """
69 71 )
70 def _checkpoint_dir_default(self):
71 return os.path.join(self.notebook_dir, '.ipynb_checkpoints')
72
73 def _checkpoint_dir_changed(self, name, old, new):
74 """do a bit of validation of the checkpoint dir"""
75 if not os.path.isabs(new):
76 # If we receive a non-absolute path, make it absolute.
77 abs_new = os.path.abspath(new)
78 self.checkpoint_dir = abs_new
79 return
80 if os.path.exists(new) and not os.path.isdir(new):
81 raise TraitError("checkpoint dir %r is not a directory" % new)
82 if not os.path.exists(new):
83 self.log.info("Creating checkpoint dir %s", new)
84 try:
85 os.mkdir(new)
86 except:
87 raise TraitError("Couldn't create checkpoint dir %r" % new)
88 72
89 73 def _copy(self, src, dest):
90 74 """copy src to dest
91 75
92 76 like shutil.copy2, but log errors in copystat
93 77 """
94 78 shutil.copyfile(src, dest)
95 79 try:
96 80 shutil.copystat(src, dest)
97 81 except OSError as e:
98 82 self.log.debug("copystat on %s failed", dest, exc_info=True)
99 83
100 84 def get_notebook_names(self, path=''):
101 85 """List all notebook names in the notebook dir and path."""
102 86 path = path.strip('/')
103 87 if not os.path.isdir(self._get_os_path(path=path)):
104 88 raise web.HTTPError(404, 'Directory not found: ' + path)
105 89 names = glob.glob(self._get_os_path('*'+self.filename_ext, path))
106 90 names = [os.path.basename(name)
107 91 for name in names]
108 92 return names
109 93
110 94 def path_exists(self, path):
111 95 """Does the API-style path (directory) actually exist?
112 96
113 97 Parameters
114 98 ----------
115 99 path : string
116 100 The path to check. This is an API path (`/` separated,
117 101 relative to base notebook-dir).
118 102
119 103 Returns
120 104 -------
121 105 exists : bool
122 106 Whether the path is indeed a directory.
123 107 """
124 108 path = path.strip('/')
125 109 os_path = self._get_os_path(path=path)
126 110 return os.path.isdir(os_path)
127 111
128 112 def is_hidden(self, path):
129 113 """Does the API style path correspond to a hidden directory or file?
130 114
131 115 Parameters
132 116 ----------
133 117 path : string
134 118 The path to check. This is an API path (`/` separated,
135 119 relative to base notebook-dir).
136 120
137 121 Returns
138 122 -------
139 123 exists : bool
140 124 Whether the path is hidden.
141 125
142 126 """
143 127 path = path.strip('/')
144 128 os_path = self._get_os_path(path=path)
145 129 return is_hidden(os_path, self.notebook_dir)
146 130
147 131 def _get_os_path(self, name=None, path=''):
148 132 """Given a notebook name and a URL path, return its file system
149 133 path.
150 134
151 135 Parameters
152 136 ----------
153 137 name : string
154 138 The name of a notebook file with the .ipynb extension
155 139 path : string
156 140 The relative URL path (with '/' as separator) to the named
157 141 notebook.
158 142
159 143 Returns
160 144 -------
161 145 path : string
162 146 A file system path that combines notebook_dir (location where
163 147 server started), the relative path, and the filename with the
164 148 current operating system's url.
165 149 """
166 150 if name is not None:
167 151 path = path + '/' + name
168 152 return to_os_path(path, self.notebook_dir)
169 153
170 154 def notebook_exists(self, name, path=''):
171 155 """Returns a True if the notebook exists. Else, returns False.
172 156
173 157 Parameters
174 158 ----------
175 159 name : string
176 160 The name of the notebook you are checking.
177 161 path : string
178 162 The relative path to the notebook (with '/' as separator)
179 163
180 164 Returns
181 165 -------
182 166 bool
183 167 """
184 168 path = path.strip('/')
185 169 nbpath = self._get_os_path(name, path=path)
186 170 return os.path.isfile(nbpath)
187 171
188 172 # TODO: Remove this after we create the contents web service and directories are
189 173 # no longer listed by the notebook web service.
190 174 def list_dirs(self, path):
191 175 """List the directories for a given API style path."""
192 176 path = path.strip('/')
193 177 os_path = self._get_os_path('', path)
194 178 if not os.path.isdir(os_path):
195 179 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
196 180 elif is_hidden(os_path, self.notebook_dir):
197 181 self.log.info("Refusing to serve hidden directory, via 404 Error")
198 182 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
199 183 dir_names = os.listdir(os_path)
200 184 dirs = []
201 185 for name in dir_names:
202 186 os_path = self._get_os_path(name, path)
203 187 if os.path.isdir(os_path) and not is_hidden(os_path, self.notebook_dir)\
204 188 and self.should_list(name):
205 189 try:
206 190 model = self.get_dir_model(name, path)
207 191 except IOError:
208 192 pass
209 193 dirs.append(model)
210 194 dirs = sorted(dirs, key=sort_key)
211 195 return dirs
212 196
213 197 # TODO: Remove this after we create the contents web service and directories are
214 198 # no longer listed by the notebook web service.
215 199 def get_dir_model(self, name, path=''):
216 200 """Get the directory model given a directory name and its API style path"""
217 201 path = path.strip('/')
218 202 os_path = self._get_os_path(name, path)
219 203 if not os.path.isdir(os_path):
220 204 raise IOError('directory does not exist: %r' % os_path)
221 205 info = os.stat(os_path)
222 206 last_modified = tz.utcfromtimestamp(info.st_mtime)
223 207 created = tz.utcfromtimestamp(info.st_ctime)
224 208 # Create the notebook model.
225 209 model ={}
226 210 model['name'] = name
227 211 model['path'] = path
228 212 model['last_modified'] = last_modified
229 213 model['created'] = created
230 214 model['type'] = 'directory'
231 215 return model
232 216
233 217 def list_notebooks(self, path):
234 218 """Returns a list of dictionaries that are the standard model
235 219 for all notebooks in the relative 'path'.
236 220
237 221 Parameters
238 222 ----------
239 223 path : str
240 224 the URL path that describes the relative path for the
241 225 listed notebooks
242 226
243 227 Returns
244 228 -------
245 229 notebooks : list of dicts
246 230 a list of the notebook models without 'content'
247 231 """
248 232 path = path.strip('/')
249 233 notebook_names = self.get_notebook_names(path)
250 234 notebooks = [self.get_notebook(name, path, content=False)
251 235 for name in notebook_names if self.should_list(name)]
252 236 notebooks = sorted(notebooks, key=sort_key)
253 237 return notebooks
254 238
255 239 def get_notebook(self, name, path='', content=True):
256 240 """ Takes a path and name for a notebook and returns its model
257 241
258 242 Parameters
259 243 ----------
260 244 name : str
261 245 the name of the notebook
262 246 path : str
263 247 the URL path that describes the relative path for
264 248 the notebook
265 249
266 250 Returns
267 251 -------
268 252 model : dict
269 253 the notebook model. If contents=True, returns the 'contents'
270 254 dict in the model as well.
271 255 """
272 256 path = path.strip('/')
273 257 if not self.notebook_exists(name=name, path=path):
274 258 raise web.HTTPError(404, u'Notebook does not exist: %s' % name)
275 259 os_path = self._get_os_path(name, path)
276 260 info = os.stat(os_path)
277 261 last_modified = tz.utcfromtimestamp(info.st_mtime)
278 262 created = tz.utcfromtimestamp(info.st_ctime)
279 263 # Create the notebook model.
280 264 model ={}
281 265 model['name'] = name
282 266 model['path'] = path
283 267 model['last_modified'] = last_modified
284 268 model['created'] = created
285 269 model['type'] = 'notebook'
286 270 if content:
287 271 with io.open(os_path, 'r', encoding='utf-8') as f:
288 272 try:
289 273 nb = current.read(f, u'json')
290 274 except Exception as e:
291 275 raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
292 276 self.mark_trusted_cells(nb, name, path)
293 277 model['content'] = nb
294 278 return model
295 279
296 280 def save_notebook(self, model, name='', path=''):
297 281 """Save the notebook model and return the model with no content."""
298 282 path = path.strip('/')
299 283
300 284 if 'content' not in model:
301 285 raise web.HTTPError(400, u'No notebook JSON data provided')
302 286
303 287 # One checkpoint should always exist
304 288 if self.notebook_exists(name, path) and not self.list_checkpoints(name, path):
305 289 self.create_checkpoint(name, path)
306 290
307 291 new_path = model.get('path', path).strip('/')
308 292 new_name = model.get('name', name)
309 293
310 294 if path != new_path or name != new_name:
311 295 self.rename_notebook(name, path, new_name, new_path)
312 296
313 297 # Save the notebook file
314 298 os_path = self._get_os_path(new_name, new_path)
315 299 nb = current.to_notebook_json(model['content'])
316 300
317 301 self.check_and_sign(nb, new_name, new_path)
318 302
319 303 if 'name' in nb['metadata']:
320 304 nb['metadata']['name'] = u''
321 305 try:
322 306 self.log.debug("Autosaving notebook %s", os_path)
323 307 with io.open(os_path, 'w', encoding='utf-8') as f:
324 308 current.write(nb, f, u'json')
325 309 except Exception as e:
326 310 raise web.HTTPError(400, u'Unexpected error while autosaving notebook: %s %s' % (os_path, e))
327 311
328 312 # Save .py script as well
329 313 if self.save_script:
330 314 py_path = os.path.splitext(os_path)[0] + '.py'
331 315 self.log.debug("Writing script %s", py_path)
332 316 try:
333 317 with io.open(py_path, 'w', encoding='utf-8') as f:
334 318 current.write(nb, f, u'py')
335 319 except Exception as e:
336 320 raise web.HTTPError(400, u'Unexpected error while saving notebook as script: %s %s' % (py_path, e))
337 321
338 322 model = self.get_notebook(new_name, new_path, content=False)
339 323 return model
340 324
341 325 def update_notebook(self, model, name, path=''):
342 326 """Update the notebook's path and/or name"""
343 327 path = path.strip('/')
344 328 new_name = model.get('name', name)
345 329 new_path = model.get('path', path).strip('/')
346 330 if path != new_path or name != new_name:
347 331 self.rename_notebook(name, path, new_name, new_path)
348 332 model = self.get_notebook(new_name, new_path, content=False)
349 333 return model
350 334
351 335 def delete_notebook(self, name, path=''):
352 336 """Delete notebook by name and path."""
353 337 path = path.strip('/')
354 338 os_path = self._get_os_path(name, path)
355 339 if not os.path.isfile(os_path):
356 340 raise web.HTTPError(404, u'Notebook does not exist: %s' % os_path)
357 341
358 342 # clear checkpoints
359 343 for checkpoint in self.list_checkpoints(name, path):
360 344 checkpoint_id = checkpoint['id']
361 345 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
362 346 if os.path.isfile(cp_path):
363 347 self.log.debug("Unlinking checkpoint %s", cp_path)
364 348 os.unlink(cp_path)
365 349
366 350 self.log.debug("Unlinking notebook %s", os_path)
367 351 os.unlink(os_path)
368 352
369 353 def rename_notebook(self, old_name, old_path, new_name, new_path):
370 354 """Rename a notebook."""
371 355 old_path = old_path.strip('/')
372 356 new_path = new_path.strip('/')
373 357 if new_name == old_name and new_path == old_path:
374 358 return
375 359
376 360 new_os_path = self._get_os_path(new_name, new_path)
377 361 old_os_path = self._get_os_path(old_name, old_path)
378 362
379 363 # Should we proceed with the move?
380 364 if os.path.isfile(new_os_path):
381 365 raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
382 366 if self.save_script:
383 367 old_py_path = os.path.splitext(old_os_path)[0] + '.py'
384 368 new_py_path = os.path.splitext(new_os_path)[0] + '.py'
385 369 if os.path.isfile(new_py_path):
386 370 raise web.HTTPError(409, u'Python script with name already exists: %s' % new_py_path)
387 371
388 372 # Move the notebook file
389 373 try:
390 374 shutil.move(old_os_path, new_os_path)
391 375 except Exception as e:
392 376 raise web.HTTPError(500, u'Unknown error renaming notebook: %s %s' % (old_os_path, e))
393 377
394 378 # Move the checkpoints
395 379 old_checkpoints = self.list_checkpoints(old_name, old_path)
396 380 for cp in old_checkpoints:
397 381 checkpoint_id = cp['id']
398 382 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
399 383 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
400 384 if os.path.isfile(old_cp_path):
401 385 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
402 386 shutil.move(old_cp_path, new_cp_path)
403 387
404 388 # Move the .py script
405 389 if self.save_script:
406 390 shutil.move(old_py_path, new_py_path)
407 391
408 392 # Checkpoint-related utilities
409 393
410 394 def get_checkpoint_path(self, checkpoint_id, name, path=''):
411 395 """find the path to a checkpoint"""
412 396 path = path.strip('/')
413 397 basename, _ = os.path.splitext(name)
414 398 filename = u"{name}-{checkpoint_id}{ext}".format(
415 399 name=basename,
416 400 checkpoint_id=checkpoint_id,
417 401 ext=self.filename_ext,
418 402 )
419 cp_path = os.path.join(path, self.checkpoint_dir, filename)
403 os_path = self._get_os_path(path=path)
404 cp_dir = os.path.join(os_path, self.checkpoint_dir)
405 if not os.path.exists(cp_dir):
406 os.mkdir(cp_dir)
407 cp_path = os.path.join(cp_dir, filename)
420 408 return cp_path
421 409
422 410 def get_checkpoint_model(self, checkpoint_id, name, path=''):
423 411 """construct the info dict for a given checkpoint"""
424 412 path = path.strip('/')
425 413 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
426 414 stats = os.stat(cp_path)
427 415 last_modified = tz.utcfromtimestamp(stats.st_mtime)
428 416 info = dict(
429 417 id = checkpoint_id,
430 418 last_modified = last_modified,
431 419 )
432 420 return info
433 421
434 422 # public checkpoint API
435 423
436 424 def create_checkpoint(self, name, path=''):
437 425 """Create a checkpoint from the current state of a notebook"""
438 426 path = path.strip('/')
439 427 nb_path = self._get_os_path(name, path)
440 428 # only the one checkpoint ID:
441 429 checkpoint_id = u"checkpoint"
442 430 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
443 431 self.log.debug("creating checkpoint for notebook %s", name)
444 432 if not os.path.exists(self.checkpoint_dir):
445 433 os.mkdir(self.checkpoint_dir)
446 434 self._copy(nb_path, cp_path)
447 435
448 436 # return the checkpoint info
449 437 return self.get_checkpoint_model(checkpoint_id, name, path)
450 438
451 439 def list_checkpoints(self, name, path=''):
452 440 """list the checkpoints for a given notebook
453 441
454 442 This notebook manager currently only supports one checkpoint per notebook.
455 443 """
456 444 path = path.strip('/')
457 445 checkpoint_id = "checkpoint"
458 path = self.get_checkpoint_path(checkpoint_id, name, path)
459 if not os.path.exists(path):
446 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
447 if not os.path.exists(os_path):
460 448 return []
461 449 else:
462 450 return [self.get_checkpoint_model(checkpoint_id, name, path)]
463 451
464 452
465 453 def restore_checkpoint(self, checkpoint_id, name, path=''):
466 454 """restore a notebook to a checkpointed state"""
467 455 path = path.strip('/')
468 456 self.log.info("restoring Notebook %s from checkpoint %s", name, checkpoint_id)
469 457 nb_path = self._get_os_path(name, path)
470 458 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
471 459 if not os.path.isfile(cp_path):
472 460 self.log.debug("checkpoint file does not exist: %s", cp_path)
473 461 raise web.HTTPError(404,
474 462 u'Notebook checkpoint does not exist: %s-%s' % (name, checkpoint_id)
475 463 )
476 464 # ensure notebook is readable (never restore from an unreadable notebook)
477 465 with io.open(cp_path, 'r', encoding='utf-8') as f:
478 466 current.read(f, u'json')
479 467 self._copy(cp_path, nb_path)
480 468 self.log.debug("copying %s -> %s", cp_path, nb_path)
481 469
482 470 def delete_checkpoint(self, checkpoint_id, name, path=''):
483 471 """delete a notebook's checkpoint"""
484 472 path = path.strip('/')
485 473 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
486 474 if not os.path.isfile(cp_path):
487 475 raise web.HTTPError(404,
488 476 u'Notebook checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
489 477 )
490 478 self.log.debug("unlinking %s", cp_path)
491 479 os.unlink(cp_path)
492 480
493 481 def info_string(self):
494 482 return "Serving notebooks from local directory: %s" % self.notebook_dir
@@ -1,306 +1,320 b''
1 1 # coding: utf-8
2 2 """Tests for the notebook manager."""
3 3 from __future__ import print_function
4 4
5 5 import logging
6 6 import os
7 7
8 8 from tornado.web import HTTPError
9 9 from unittest import TestCase
10 10 from tempfile import NamedTemporaryFile
11 11
12 12 from IPython.nbformat import current
13 13
14 14 from IPython.utils.tempdir import TemporaryDirectory
15 15 from IPython.utils.traitlets import TraitError
16 16 from IPython.html.utils import url_path_join
17 17
18 18 from ..filenbmanager import FileNotebookManager
19 19 from ..nbmanager import NotebookManager
20 20
21 21
22 22 class TestFileNotebookManager(TestCase):
23 23
24 24 def test_nb_dir(self):
25 25 with TemporaryDirectory() as td:
26 26 fm = FileNotebookManager(notebook_dir=td)
27 27 self.assertEqual(fm.notebook_dir, td)
28 28
29 29 def test_missing_nb_dir(self):
30 30 with TemporaryDirectory() as td:
31 31 nbdir = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
32 32 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=nbdir)
33 33
34 34 def test_invalid_nb_dir(self):
35 35 with NamedTemporaryFile() as tf:
36 36 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=tf.name)
37 37
38 38 def test_get_os_path(self):
39 39 # full filesystem path should be returned with correct operating system
40 40 # separators.
41 41 with TemporaryDirectory() as td:
42 42 nbdir = td
43 43 fm = FileNotebookManager(notebook_dir=nbdir)
44 44 path = fm._get_os_path('test.ipynb', '/path/to/notebook/')
45 45 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
46 46 fs_path = os.path.join(fm.notebook_dir, *rel_path_list)
47 47 self.assertEqual(path, fs_path)
48 48
49 49 fm = FileNotebookManager(notebook_dir=nbdir)
50 50 path = fm._get_os_path('test.ipynb')
51 51 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
52 52 self.assertEqual(path, fs_path)
53 53
54 54 fm = FileNotebookManager(notebook_dir=nbdir)
55 55 path = fm._get_os_path('test.ipynb', '////')
56 56 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
57 57 self.assertEqual(path, fs_path)
58
59 def test_checkpoint_subdir(self):
60 subd = u'sub βˆ‚ir'
61 cp_name = 'test-cp.ipynb'
62 with TemporaryDirectory() as td:
63 nbdir = td
64 os.mkdir(os.path.join(td, subd))
65 fm = FileNotebookManager(notebook_dir=nbdir)
66 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb', '/')
67 cp_subdir = fm.get_checkpoint_path('cp', 'test.ipynb', '/%s/' % subd)
68 self.assertNotEqual(cp_dir, cp_subdir)
69 self.assertEqual(cp_dir, os.path.join(nbdir, fm.checkpoint_dir, cp_name))
70 self.assertEqual(cp_subdir, os.path.join(nbdir, subd, fm.checkpoint_dir, cp_name))
71
58 72
59 73 class TestNotebookManager(TestCase):
60 74
61 75 def setUp(self):
62 76 self._temp_dir = TemporaryDirectory()
63 77 self.td = self._temp_dir.name
64 78 self.notebook_manager = FileNotebookManager(
65 79 notebook_dir=self.td,
66 80 log=logging.getLogger()
67 81 )
68 82
69 83 def tearDown(self):
70 84 self._temp_dir.cleanup()
71 85
72 86 def make_dir(self, abs_path, rel_path):
73 87 """make subdirectory, rel_path is the relative path
74 88 to that directory from the location where the server started"""
75 89 os_path = os.path.join(abs_path, rel_path)
76 90 try:
77 91 os.makedirs(os_path)
78 92 except OSError:
79 93 print("Directory already exists: %r" % os_path)
80 94
81 95 def add_code_cell(self, nb):
82 96 output = current.new_output("display_data", output_javascript="alert('hi');")
83 97 cell = current.new_code_cell("print('hi')", outputs=[output])
84 98 if not nb.worksheets:
85 99 nb.worksheets.append(current.new_worksheet())
86 100 nb.worksheets[0].cells.append(cell)
87 101
88 102 def new_notebook(self):
89 103 nbm = self.notebook_manager
90 104 model = nbm.create_notebook()
91 105 name = model['name']
92 106 path = model['path']
93 107
94 108 full_model = nbm.get_notebook(name, path)
95 109 nb = full_model['content']
96 110 self.add_code_cell(nb)
97 111
98 112 nbm.save_notebook(full_model, name, path)
99 113 return nb, name, path
100 114
101 115 def test_create_notebook(self):
102 116 nm = self.notebook_manager
103 117 # Test in root directory
104 118 model = nm.create_notebook()
105 119 assert isinstance(model, dict)
106 120 self.assertIn('name', model)
107 121 self.assertIn('path', model)
108 122 self.assertEqual(model['name'], 'Untitled0.ipynb')
109 123 self.assertEqual(model['path'], '')
110 124
111 125 # Test in sub-directory
112 126 sub_dir = '/foo/'
113 127 self.make_dir(nm.notebook_dir, 'foo')
114 128 model = nm.create_notebook(None, sub_dir)
115 129 assert isinstance(model, dict)
116 130 self.assertIn('name', model)
117 131 self.assertIn('path', model)
118 132 self.assertEqual(model['name'], 'Untitled0.ipynb')
119 133 self.assertEqual(model['path'], sub_dir.strip('/'))
120 134
121 135 def test_get_notebook(self):
122 136 nm = self.notebook_manager
123 137 # Create a notebook
124 138 model = nm.create_notebook()
125 139 name = model['name']
126 140 path = model['path']
127 141
128 142 # Check that we 'get' on the notebook we just created
129 143 model2 = nm.get_notebook(name, path)
130 144 assert isinstance(model2, dict)
131 145 self.assertIn('name', model2)
132 146 self.assertIn('path', model2)
133 147 self.assertEqual(model['name'], name)
134 148 self.assertEqual(model['path'], path)
135 149
136 150 # Test in sub-directory
137 151 sub_dir = '/foo/'
138 152 self.make_dir(nm.notebook_dir, 'foo')
139 153 model = nm.create_notebook(None, sub_dir)
140 154 model2 = nm.get_notebook(name, sub_dir)
141 155 assert isinstance(model2, dict)
142 156 self.assertIn('name', model2)
143 157 self.assertIn('path', model2)
144 158 self.assertIn('content', model2)
145 159 self.assertEqual(model2['name'], 'Untitled0.ipynb')
146 160 self.assertEqual(model2['path'], sub_dir.strip('/'))
147 161
148 162 def test_update_notebook(self):
149 163 nm = self.notebook_manager
150 164 # Create a notebook
151 165 model = nm.create_notebook()
152 166 name = model['name']
153 167 path = model['path']
154 168
155 169 # Change the name in the model for rename
156 170 model['name'] = 'test.ipynb'
157 171 model = nm.update_notebook(model, name, path)
158 172 assert isinstance(model, dict)
159 173 self.assertIn('name', model)
160 174 self.assertIn('path', model)
161 175 self.assertEqual(model['name'], 'test.ipynb')
162 176
163 177 # Make sure the old name is gone
164 178 self.assertRaises(HTTPError, nm.get_notebook, name, path)
165 179
166 180 # Test in sub-directory
167 181 # Create a directory and notebook in that directory
168 182 sub_dir = '/foo/'
169 183 self.make_dir(nm.notebook_dir, 'foo')
170 184 model = nm.create_notebook(None, sub_dir)
171 185 name = model['name']
172 186 path = model['path']
173 187
174 188 # Change the name in the model for rename
175 189 model['name'] = 'test_in_sub.ipynb'
176 190 model = nm.update_notebook(model, name, path)
177 191 assert isinstance(model, dict)
178 192 self.assertIn('name', model)
179 193 self.assertIn('path', model)
180 194 self.assertEqual(model['name'], 'test_in_sub.ipynb')
181 195 self.assertEqual(model['path'], sub_dir.strip('/'))
182 196
183 197 # Make sure the old name is gone
184 198 self.assertRaises(HTTPError, nm.get_notebook, name, path)
185 199
186 200 def test_save_notebook(self):
187 201 nm = self.notebook_manager
188 202 # Create a notebook
189 203 model = nm.create_notebook()
190 204 name = model['name']
191 205 path = model['path']
192 206
193 207 # Get the model with 'content'
194 208 full_model = nm.get_notebook(name, path)
195 209
196 210 # Save the notebook
197 211 model = nm.save_notebook(full_model, name, path)
198 212 assert isinstance(model, dict)
199 213 self.assertIn('name', model)
200 214 self.assertIn('path', model)
201 215 self.assertEqual(model['name'], name)
202 216 self.assertEqual(model['path'], path)
203 217
204 218 # Test in sub-directory
205 219 # Create a directory and notebook in that directory
206 220 sub_dir = '/foo/'
207 221 self.make_dir(nm.notebook_dir, 'foo')
208 222 model = nm.create_notebook(None, sub_dir)
209 223 name = model['name']
210 224 path = model['path']
211 225 model = nm.get_notebook(name, path)
212 226
213 227 # Change the name in the model for rename
214 228 model = nm.save_notebook(model, name, path)
215 229 assert isinstance(model, dict)
216 230 self.assertIn('name', model)
217 231 self.assertIn('path', model)
218 232 self.assertEqual(model['name'], 'Untitled0.ipynb')
219 233 self.assertEqual(model['path'], sub_dir.strip('/'))
220 234
221 235 def test_save_notebook_with_script(self):
222 236 nm = self.notebook_manager
223 237 # Create a notebook
224 238 model = nm.create_notebook()
225 239 nm.save_script = True
226 240 model = nm.create_notebook()
227 241 name = model['name']
228 242 path = model['path']
229 243
230 244 # Get the model with 'content'
231 245 full_model = nm.get_notebook(name, path)
232 246
233 247 # Save the notebook
234 248 model = nm.save_notebook(full_model, name, path)
235 249
236 250 # Check that the script was created
237 251 py_path = os.path.join(nm.notebook_dir, os.path.splitext(name)[0]+'.py')
238 252 assert os.path.exists(py_path), py_path
239 253
240 254 def test_delete_notebook(self):
241 255 nm = self.notebook_manager
242 256 # Create a notebook
243 257 nb, name, path = self.new_notebook()
244 258
245 259 # Delete the notebook
246 260 nm.delete_notebook(name, path)
247 261
248 262 # Check that a 'get' on the deleted notebook raises and error
249 263 self.assertRaises(HTTPError, nm.get_notebook, name, path)
250 264
251 265 def test_copy_notebook(self):
252 266 nm = self.notebook_manager
253 267 path = u'Γ₯ b'
254 268 name = u'nb √.ipynb'
255 269 os.mkdir(os.path.join(nm.notebook_dir, path))
256 270 orig = nm.create_notebook({'name' : name}, path=path)
257 271
258 272 # copy with unspecified name
259 273 copy = nm.copy_notebook(name, path=path)
260 274 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
261 275
262 276 # copy with specified name
263 277 copy2 = nm.copy_notebook(name, u'copy 2.ipynb', path=path)
264 278 self.assertEqual(copy2['name'], u'copy 2.ipynb')
265 279
266 280 def test_trust_notebook(self):
267 281 nbm = self.notebook_manager
268 282 nb, name, path = self.new_notebook()
269 283
270 284 untrusted = nbm.get_notebook(name, path)['content']
271 285 assert not nbm.notary.check_cells(untrusted)
272 286
273 287 # print(untrusted)
274 288 nbm.trust_notebook(name, path)
275 289 trusted = nbm.get_notebook(name, path)['content']
276 290 # print(trusted)
277 291 assert nbm.notary.check_cells(trusted)
278 292
279 293 def test_mark_trusted_cells(self):
280 294 nbm = self.notebook_manager
281 295 nb, name, path = self.new_notebook()
282 296
283 297 nbm.mark_trusted_cells(nb, name, path)
284 298 for cell in nb.worksheets[0].cells:
285 299 if cell.cell_type == 'code':
286 300 assert not cell.trusted
287 301
288 302 nbm.trust_notebook(name, path)
289 303 nb = nbm.get_notebook(name, path)['content']
290 304 for cell in nb.worksheets[0].cells:
291 305 if cell.cell_type == 'code':
292 306 assert cell.trusted
293 307
294 308 def test_check_and_sign(self):
295 309 nbm = self.notebook_manager
296 310 nb, name, path = self.new_notebook()
297 311
298 312 nbm.mark_trusted_cells(nb, name, path)
299 313 nbm.check_and_sign(nb, name, path)
300 314 assert not nbm.notary.check_signature(nb)
301 315
302 316 nbm.trust_notebook(name, path)
303 317 nb = nbm.get_notebook(name, path)['content']
304 318 nbm.mark_trusted_cells(nb, name, path)
305 319 nbm.check_and_sign(nb, name, path)
306 320 assert nbm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now