##// END OF EJS Templates
Get the existing tests working.
Brian E. Granger -
Show More
@@ -1,455 +1,459 b''
1 1 """A notebook manager that uses the local file system for storage.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 * Zach Sailer
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 import io
21 21 import itertools
22 22 import os
23 23 import glob
24 24 import shutil
25 25
26 26 from tornado import web
27 27
28 28 from .nbmanager import NotebookManager
29 29 from IPython.nbformat import current
30 30 from IPython.utils.traitlets import Unicode, Dict, Bool, TraitError
31 31 from IPython.utils import tz
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Classes
35 35 #-----------------------------------------------------------------------------
36 36
37 37 class FileNotebookManager(NotebookManager):
38 38
39 39 save_script = Bool(False, config=True,
40 40 help="""Automatically create a Python script when saving the notebook.
41 41
42 42 For easier use of import, %run and %load across notebooks, a
43 43 <notebook-name>.py script will be created next to any
44 44 <notebook-name>.ipynb on each save. This can also be set with the
45 45 short `--script` flag.
46 46 """
47 47 )
48 48
49 49 checkpoint_dir = Unicode(config=True,
50 50 help="""The location in which to keep notebook checkpoints
51 51
52 52 By default, it is notebook-dir/.ipynb_checkpoints
53 53 """
54 54 )
55 55 def _checkpoint_dir_default(self):
56 56 return os.path.join(self.notebook_dir, '.ipynb_checkpoints')
57 57
58 58 def _checkpoint_dir_changed(self, name, old, new):
59 59 """do a bit of validation of the checkpoint dir"""
60 60 if not os.path.isabs(new):
61 61 # If we receive a non-absolute path, make it absolute.
62 62 abs_new = os.path.abspath(new)
63 63 self.checkpoint_dir = abs_new
64 64 return
65 65 if os.path.exists(new) and not os.path.isdir(new):
66 66 raise TraitError("checkpoint dir %r is not a directory" % new)
67 67 if not os.path.exists(new):
68 68 self.log.info("Creating checkpoint dir %s", new)
69 69 try:
70 70 os.mkdir(new)
71 71 except:
72 72 raise TraitError("Couldn't create checkpoint dir %r" % new)
73 73
74 74 def get_notebook_names(self, path=''):
75 75 """List all notebook names in the notebook dir and path."""
76 76 path = path.strip('/')
77 77 if not os.path.isdir(self.get_os_path(path=path)):
78 78 raise web.HTTPError(404, 'Directory not found: ' + path)
79 79 names = glob.glob(self.get_os_path('*'+self.filename_ext, path))
80 80 names = [os.path.basename(name)
81 81 for name in names]
82 82 return names
83 83
84 84 def increment_filename(self, basename, path='', ext='.ipynb'):
85 85 """Return a non-used filename of the form basename<int>."""
86 86 path = path.strip('/')
87 87 for i in itertools.count():
88 88 name = u'{basename}{i}{ext}'.format(basename=basename, i=i, ext=ext)
89 89 os_path = self.get_os_path(name, path)
90 90 if not os.path.isfile(os_path):
91 91 break
92 92 return name
93 93
94 94 def path_exists(self, path):
95 95 """Does the API-style path (directory) actually exist?
96 96
97 97 Parameters
98 98 ----------
99 99 path : string
100 100 The path to check. This is an API path (`/` separated,
101 101 relative to base notebook-dir).
102 102
103 103 Returns
104 104 -------
105 105 exists : bool
106 106 Whether the path is indeed a directory.
107 107 """
108 108 path = path.strip('/')
109 109 os_path = self.get_os_path(path=path)
110 110 return os.path.isdir(os_path)
111 111
112 112 def get_os_path(self, name=None, path=''):
113 113 """Given a notebook name and a URL path, return its file system
114 114 path.
115 115
116 116 Parameters
117 117 ----------
118 118 name : string
119 119 The name of a notebook file with the .ipynb extension
120 120 path : string
121 121 The relative URL path (with '/' as separator) to the named
122 122 notebook.
123 123
124 124 Returns
125 125 -------
126 126 path : string
127 127 A file system path that combines notebook_dir (location where
128 128 server started), the relative path, and the filename with the
129 129 current operating system's url.
130 130 """
131 131 parts = path.strip('/').split('/')
132 132 parts = [p for p in parts if p != ''] # remove duplicate splits
133 133 if name is not None:
134 134 parts.append(name)
135 135 path = os.path.join(self.notebook_dir, *parts)
136 136 return path
137 137
138 138 def notebook_exists(self, name, path=''):
139 139 """Returns a True if the notebook exists. Else, returns False.
140 140
141 141 Parameters
142 142 ----------
143 143 name : string
144 144 The name of the notebook you are checking.
145 145 path : string
146 146 The relative path to the notebook (with '/' as separator)
147 147
148 148 Returns
149 149 -------
150 150 bool
151 151 """
152 152 path = path.strip('/')
153 153 nbpath = self.get_os_path(name, path=path)
154 154 return os.path.isfile(nbpath)
155 155
156 # TODO: Remove this after we create the contents web service and directories are
157 # no longer listed by the notebook web service.
156 158 def list_dirs(self, path):
157 159 """List the directories for a given API style path."""
158 160 path = path.strip('/')
159 161 os_path = self.get_os_path('', path)
160 162 dir_names = os.listdir(os_path)
161 163 dirs = []
162 164 for name in dir_names:
163 165 os_path = self.get_os_path(name, path)
164 166 if os.path.isdir(os_path) and not name.startswith('.'):
165 167 model = self.get_dir_model(name, path)
166 168 dirs.append(model)
167 169 dirs = sorted(dirs, key=lambda item: item['name'])
168 170 return dirs
169 171
172 # TODO: Remove this after we create the contents web service and directories are
173 # no longer listed by the notebook web service.
170 174 def get_dir_model(self, name, path=''):
171 175 """Get the directory model given a directory name and its API style path"""
172 176 path = path.strip('/')
173 177 os_path = self.get_os_path(name, path)
174 178 if not os.path.isdir(os_path):
175 179 raise IOError('directory does not exist: %r' % os_path)
176 180 info = os.stat(os_path)
177 181 last_modified = tz.utcfromtimestamp(info.st_mtime)
178 182 created = tz.utcfromtimestamp(info.st_ctime)
179 183 # Create the notebook model.
180 184 model ={}
181 185 model['name'] = name
182 186 model['path'] = path
183 187 model['last_modified'] = last_modified
184 188 model['created'] = created
185 189 model['type'] = 'directory'
186 190 return model
187 191
188 192 def list_notebooks(self, path):
189 193 """Returns a list of dictionaries that are the standard model
190 194 for all notebooks in the relative 'path'.
191 195
192 196 Parameters
193 197 ----------
194 198 path : str
195 199 the URL path that describes the relative path for the
196 200 listed notebooks
197 201
198 202 Returns
199 203 -------
200 204 notebooks : list of dicts
201 205 a list of the notebook models without 'content'
202 206 """
203 207 path = path.strip('/')
204 208 notebook_names = self.get_notebook_names(path)
205 209 index = []
206 210 notebooks = []
207 211 for name in notebook_names:
208 212 model = self.get_notebook_model(name, path, content=False)
209 213 if name.lower() == 'index.ipynb':
210 214 index.append(model)
211 215 else:
212 216 notebooks.append(model)
213 217 notebooks = sorted(notebooks, key=lambda item: item['name'])
214 218 notebooks = index + self.list_dirs(path) + notebooks
215 219 return notebooks
216 220
217 221 def get_notebook_model(self, name, path='', content=True):
218 222 """ Takes a path and name for a notebook and returns its model
219 223
220 224 Parameters
221 225 ----------
222 226 name : str
223 227 the name of the notebook
224 228 path : str
225 229 the URL path that describes the relative path for
226 230 the notebook
227 231
228 232 Returns
229 233 -------
230 234 model : dict
231 235 the notebook model. If contents=True, returns the 'contents'
232 236 dict in the model as well.
233 237 """
234 238 path = path.strip('/')
235 239 if not self.notebook_exists(name=name, path=path):
236 240 raise web.HTTPError(404, u'Notebook does not exist: %s' % name)
237 241 os_path = self.get_os_path(name, path)
238 242 info = os.stat(os_path)
239 243 last_modified = tz.utcfromtimestamp(info.st_mtime)
240 244 created = tz.utcfromtimestamp(info.st_ctime)
241 245 # Create the notebook model.
242 246 model ={}
243 247 model['name'] = name
244 248 model['path'] = path
245 249 model['last_modified'] = last_modified
246 250 model['created'] = created
247 251 if content:
248 252 with io.open(os_path, 'r', encoding='utf-8') as f:
249 253 try:
250 254 nb = current.read(f, u'json')
251 255 except Exception as e:
252 256 raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
253 257 self.mark_trusted_cells(nb, path, name)
254 258 model['content'] = nb
255 259 return model
256 260
257 261 def save_notebook_model(self, model, name='', path=''):
258 262 """Save the notebook model and return the model with no content."""
259 263 path = path.strip('/')
260 264
261 265 if 'content' not in model:
262 266 raise web.HTTPError(400, u'No notebook JSON data provided')
263 267
264 268 # One checkpoint should always exist
265 269 if self.notebook_exists(name, path) and not self.list_checkpoints(name, path):
266 270 self.create_checkpoint(name, path)
267 271
268 272 new_path = model.get('path', path).strip('/')
269 273 new_name = model.get('name', name)
270 274
271 275 if path != new_path or name != new_name:
272 276 self.rename_notebook(name, path, new_name, new_path)
273 277
274 278 # Save the notebook file
275 279 os_path = self.get_os_path(new_name, new_path)
276 280 nb = current.to_notebook_json(model['content'])
277 281
278 282 self.check_and_sign(nb, new_path, new_name)
279 283
280 284 if 'name' in nb['metadata']:
281 285 nb['metadata']['name'] = u''
282 286 try:
283 287 self.log.debug("Autosaving notebook %s", os_path)
284 288 with io.open(os_path, 'w', encoding='utf-8') as f:
285 289 current.write(nb, f, u'json')
286 290 except Exception as e:
287 291 raise web.HTTPError(400, u'Unexpected error while autosaving notebook: %s %s' % (os_path, e))
288 292
289 293 # Save .py script as well
290 294 if self.save_script:
291 295 py_path = os.path.splitext(os_path)[0] + '.py'
292 296 self.log.debug("Writing script %s", py_path)
293 297 try:
294 298 with io.open(py_path, 'w', encoding='utf-8') as f:
295 299 current.write(nb, f, u'py')
296 300 except Exception as e:
297 301 raise web.HTTPError(400, u'Unexpected error while saving notebook as script: %s %s' % (py_path, e))
298 302
299 303 model = self.get_notebook_model(new_name, new_path, content=False)
300 304 return model
301 305
302 306 def update_notebook_model(self, model, name, path=''):
303 307 """Update the notebook's path and/or name"""
304 308 path = path.strip('/')
305 309 new_name = model.get('name', name)
306 310 new_path = model.get('path', path).strip('/')
307 311 if path != new_path or name != new_name:
308 312 self.rename_notebook(name, path, new_name, new_path)
309 313 model = self.get_notebook_model(new_name, new_path, content=False)
310 314 return model
311 315
312 316 def delete_notebook_model(self, name, path=''):
313 317 """Delete notebook by name and path."""
314 318 path = path.strip('/')
315 319 os_path = self.get_os_path(name, path)
316 320 if not os.path.isfile(os_path):
317 321 raise web.HTTPError(404, u'Notebook does not exist: %s' % os_path)
318 322
319 323 # clear checkpoints
320 324 for checkpoint in self.list_checkpoints(name, path):
321 325 checkpoint_id = checkpoint['id']
322 326 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
323 327 if os.path.isfile(cp_path):
324 328 self.log.debug("Unlinking checkpoint %s", cp_path)
325 329 os.unlink(cp_path)
326 330
327 331 self.log.debug("Unlinking notebook %s", os_path)
328 332 os.unlink(os_path)
329 333
330 334 def rename_notebook(self, old_name, old_path, new_name, new_path):
331 335 """Rename a notebook."""
332 336 old_path = old_path.strip('/')
333 337 new_path = new_path.strip('/')
334 338 if new_name == old_name and new_path == old_path:
335 339 return
336 340
337 341 new_os_path = self.get_os_path(new_name, new_path)
338 342 old_os_path = self.get_os_path(old_name, old_path)
339 343
340 344 # Should we proceed with the move?
341 345 if os.path.isfile(new_os_path):
342 346 raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
343 347 if self.save_script:
344 348 old_py_path = os.path.splitext(old_os_path)[0] + '.py'
345 349 new_py_path = os.path.splitext(new_os_path)[0] + '.py'
346 350 if os.path.isfile(new_py_path):
347 351 raise web.HTTPError(409, u'Python script with name already exists: %s' % new_py_path)
348 352
349 353 # Move the notebook file
350 354 try:
351 355 os.rename(old_os_path, new_os_path)
352 356 except Exception as e:
353 357 raise web.HTTPError(500, u'Unknown error renaming notebook: %s %s' % (old_os_path, e))
354 358
355 359 # Move the checkpoints
356 360 old_checkpoints = self.list_checkpoints(old_name, old_path)
357 361 for cp in old_checkpoints:
358 362 checkpoint_id = cp['id']
359 363 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
360 364 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
361 365 if os.path.isfile(old_cp_path):
362 366 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
363 367 os.rename(old_cp_path, new_cp_path)
364 368
365 369 # Move the .py script
366 370 if self.save_script:
367 371 os.rename(old_py_path, new_py_path)
368 372
369 373 # Checkpoint-related utilities
370 374
371 375 def get_checkpoint_path(self, checkpoint_id, name, path=''):
372 376 """find the path to a checkpoint"""
373 377 path = path.strip('/')
374 378 basename, _ = os.path.splitext(name)
375 379 filename = u"{name}-{checkpoint_id}{ext}".format(
376 380 name=basename,
377 381 checkpoint_id=checkpoint_id,
378 382 ext=self.filename_ext,
379 383 )
380 384 cp_path = os.path.join(path, self.checkpoint_dir, filename)
381 385 return cp_path
382 386
383 387 def get_checkpoint_model(self, checkpoint_id, name, path=''):
384 388 """construct the info dict for a given checkpoint"""
385 389 path = path.strip('/')
386 390 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
387 391 stats = os.stat(cp_path)
388 392 last_modified = tz.utcfromtimestamp(stats.st_mtime)
389 393 info = dict(
390 394 id = checkpoint_id,
391 395 last_modified = last_modified,
392 396 )
393 397 return info
394 398
395 399 # public checkpoint API
396 400
397 401 def create_checkpoint(self, name, path=''):
398 402 """Create a checkpoint from the current state of a notebook"""
399 403 path = path.strip('/')
400 404 nb_path = self.get_os_path(name, path)
401 405 # only the one checkpoint ID:
402 406 checkpoint_id = u"checkpoint"
403 407 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
404 408 self.log.debug("creating checkpoint for notebook %s", name)
405 409 if not os.path.exists(self.checkpoint_dir):
406 410 os.mkdir(self.checkpoint_dir)
407 411 shutil.copy2(nb_path, cp_path)
408 412
409 413 # return the checkpoint info
410 414 return self.get_checkpoint_model(checkpoint_id, name, path)
411 415
412 416 def list_checkpoints(self, name, path=''):
413 417 """list the checkpoints for a given notebook
414 418
415 419 This notebook manager currently only supports one checkpoint per notebook.
416 420 """
417 421 path = path.strip('/')
418 422 checkpoint_id = "checkpoint"
419 423 path = self.get_checkpoint_path(checkpoint_id, name, path)
420 424 if not os.path.exists(path):
421 425 return []
422 426 else:
423 427 return [self.get_checkpoint_model(checkpoint_id, name, path)]
424 428
425 429
426 430 def restore_checkpoint(self, checkpoint_id, name, path=''):
427 431 """restore a notebook to a checkpointed state"""
428 432 path = path.strip('/')
429 433 self.log.info("restoring Notebook %s from checkpoint %s", name, checkpoint_id)
430 434 nb_path = self.get_os_path(name, path)
431 435 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
432 436 if not os.path.isfile(cp_path):
433 437 self.log.debug("checkpoint file does not exist: %s", cp_path)
434 438 raise web.HTTPError(404,
435 439 u'Notebook checkpoint does not exist: %s-%s' % (name, checkpoint_id)
436 440 )
437 441 # ensure notebook is readable (never restore from an unreadable notebook)
438 442 with io.open(cp_path, 'r', encoding='utf-8') as f:
439 443 nb = current.read(f, u'json')
440 444 shutil.copy2(cp_path, nb_path)
441 445 self.log.debug("copying %s -> %s", cp_path, nb_path)
442 446
443 447 def delete_checkpoint(self, checkpoint_id, name, path=''):
444 448 """delete a notebook's checkpoint"""
445 449 path = path.strip('/')
446 450 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
447 451 if not os.path.isfile(cp_path):
448 452 raise web.HTTPError(404,
449 453 u'Notebook checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
450 454 )
451 455 self.log.debug("unlinking %s", cp_path)
452 456 os.unlink(cp_path)
453 457
454 458 def info_string(self):
455 459 return "Serving notebooks from local directory: %s" % self.notebook_dir
@@ -1,250 +1,251 b''
1 1 # coding: utf-8
2 2 """Tests for the notebook manager."""
3 3 from __future__ import print_function
4 4
5 5 import os
6 6
7 7 from tornado.web import HTTPError
8 8 from unittest import TestCase
9 9 from tempfile import NamedTemporaryFile
10 10
11 11 from IPython.utils.tempdir import TemporaryDirectory
12 12 from IPython.utils.traitlets import TraitError
13 13 from IPython.html.utils import url_path_join
14 14
15 15 from ..filenbmanager import FileNotebookManager
16 16 from ..nbmanager import NotebookManager
17 17
18
18 19 class TestFileNotebookManager(TestCase):
19 20
20 21 def test_nb_dir(self):
21 22 with TemporaryDirectory() as td:
22 23 fm = FileNotebookManager(notebook_dir=td)
23 24 self.assertEqual(fm.notebook_dir, td)
24 25
25 26 def test_create_nb_dir(self):
26 27 with TemporaryDirectory() as td:
27 28 nbdir = os.path.join(td, 'notebooks')
28 29 fm = FileNotebookManager(notebook_dir=nbdir)
29 30 self.assertEqual(fm.notebook_dir, nbdir)
30 31
31 32 def test_missing_nb_dir(self):
32 33 with TemporaryDirectory() as td:
33 34 nbdir = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
34 35 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=nbdir)
35 36
36 37 def test_invalid_nb_dir(self):
37 38 with NamedTemporaryFile() as tf:
38 39 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=tf.name)
39 40
40 41 def test_get_os_path(self):
41 42 # full filesystem path should be returned with correct operating system
42 43 # separators.
43 44 with TemporaryDirectory() as td:
44 45 nbdir = os.path.join(td, 'notebooks')
45 46 fm = FileNotebookManager(notebook_dir=nbdir)
46 47 path = fm.get_os_path('test.ipynb', '/path/to/notebook/')
47 48 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
48 49 fs_path = os.path.join(fm.notebook_dir, *rel_path_list)
49 50 self.assertEqual(path, fs_path)
50 51
51 52 fm = FileNotebookManager(notebook_dir=nbdir)
52 53 path = fm.get_os_path('test.ipynb')
53 54 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
54 55 self.assertEqual(path, fs_path)
55 56
56 57 fm = FileNotebookManager(notebook_dir=nbdir)
57 58 path = fm.get_os_path('test.ipynb', '////')
58 59 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
59 60 self.assertEqual(path, fs_path)
60 61
61 62 class TestNotebookManager(TestCase):
62 63
63 64 def make_dir(self, abs_path, rel_path):
64 65 """make subdirectory, rel_path is the relative path
65 66 to that directory from the location where the server started"""
66 67 os_path = os.path.join(abs_path, rel_path)
67 68 try:
68 69 os.makedirs(os_path)
69 70 except OSError:
70 71 print("Directory already exists.")
71 72
72 73 def test_create_notebook_model(self):
73 74 with TemporaryDirectory() as td:
74 75 # Test in root directory
75 76 nm = FileNotebookManager(notebook_dir=td)
76 77 model = nm.create_notebook_model()
77 78 assert isinstance(model, dict)
78 79 self.assertIn('name', model)
79 80 self.assertIn('path', model)
80 81 self.assertEqual(model['name'], 'Untitled0.ipynb')
81 82 self.assertEqual(model['path'], '')
82 83
83 84 # Test in sub-directory
84 85 sub_dir = '/foo/'
85 86 self.make_dir(nm.notebook_dir, 'foo')
86 87 model = nm.create_notebook_model(None, sub_dir)
87 88 assert isinstance(model, dict)
88 89 self.assertIn('name', model)
89 90 self.assertIn('path', model)
90 91 self.assertEqual(model['name'], 'Untitled0.ipynb')
91 92 self.assertEqual(model['path'], sub_dir.strip('/'))
92 93
93 94 def test_get_notebook_model(self):
94 95 with TemporaryDirectory() as td:
95 96 # Test in root directory
96 97 # Create a notebook
97 98 nm = FileNotebookManager(notebook_dir=td)
98 99 model = nm.create_notebook_model()
99 100 name = model['name']
100 101 path = model['path']
101 102
102 103 # Check that we 'get' on the notebook we just created
103 104 model2 = nm.get_notebook_model(name, path)
104 105 assert isinstance(model2, dict)
105 106 self.assertIn('name', model2)
106 107 self.assertIn('path', model2)
107 108 self.assertEqual(model['name'], name)
108 109 self.assertEqual(model['path'], path)
109 110
110 111 # Test in sub-directory
111 112 sub_dir = '/foo/'
112 113 self.make_dir(nm.notebook_dir, 'foo')
113 114 model = nm.create_notebook_model(None, sub_dir)
114 115 model2 = nm.get_notebook_model(name, sub_dir)
115 116 assert isinstance(model2, dict)
116 117 self.assertIn('name', model2)
117 118 self.assertIn('path', model2)
118 119 self.assertIn('content', model2)
119 120 self.assertEqual(model2['name'], 'Untitled0.ipynb')
120 121 self.assertEqual(model2['path'], sub_dir.strip('/'))
121 122
122 123 def test_update_notebook_model(self):
123 124 with TemporaryDirectory() as td:
124 125 # Test in root directory
125 126 # Create a notebook
126 127 nm = FileNotebookManager(notebook_dir=td)
127 128 model = nm.create_notebook_model()
128 129 name = model['name']
129 130 path = model['path']
130 131
131 132 # Change the name in the model for rename
132 133 model['name'] = 'test.ipynb'
133 134 model = nm.update_notebook_model(model, name, path)
134 135 assert isinstance(model, dict)
135 136 self.assertIn('name', model)
136 137 self.assertIn('path', model)
137 138 self.assertEqual(model['name'], 'test.ipynb')
138 139
139 140 # Make sure the old name is gone
140 141 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
141 142
142 143 # Test in sub-directory
143 144 # Create a directory and notebook in that directory
144 145 sub_dir = '/foo/'
145 146 self.make_dir(nm.notebook_dir, 'foo')
146 147 model = nm.create_notebook_model(None, sub_dir)
147 148 name = model['name']
148 149 path = model['path']
149 150
150 151 # Change the name in the model for rename
151 152 model['name'] = 'test_in_sub.ipynb'
152 153 model = nm.update_notebook_model(model, name, path)
153 154 assert isinstance(model, dict)
154 155 self.assertIn('name', model)
155 156 self.assertIn('path', model)
156 157 self.assertEqual(model['name'], 'test_in_sub.ipynb')
157 158 self.assertEqual(model['path'], sub_dir.strip('/'))
158 159
159 160 # Make sure the old name is gone
160 161 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
161 162
162 163 def test_save_notebook_model(self):
163 164 with TemporaryDirectory() as td:
164 165 # Test in the root directory
165 166 # Create a notebook
166 167 nm = FileNotebookManager(notebook_dir=td)
167 168 model = nm.create_notebook_model()
168 169 name = model['name']
169 170 path = model['path']
170 171
171 172 # Get the model with 'content'
172 173 full_model = nm.get_notebook_model(name, path)
173 174
174 175 # Save the notebook
175 176 model = nm.save_notebook_model(full_model, name, path)
176 177 assert isinstance(model, dict)
177 178 self.assertIn('name', model)
178 179 self.assertIn('path', model)
179 180 self.assertEqual(model['name'], name)
180 181 self.assertEqual(model['path'], path)
181 182
182 183 # Test in sub-directory
183 184 # Create a directory and notebook in that directory
184 185 sub_dir = '/foo/'
185 186 self.make_dir(nm.notebook_dir, 'foo')
186 187 model = nm.create_notebook_model(None, sub_dir)
187 188 name = model['name']
188 189 path = model['path']
189 190 model = nm.get_notebook_model(name, path)
190 191
191 192 # Change the name in the model for rename
192 193 model = nm.save_notebook_model(model, name, path)
193 194 assert isinstance(model, dict)
194 195 self.assertIn('name', model)
195 196 self.assertIn('path', model)
196 197 self.assertEqual(model['name'], 'Untitled0.ipynb')
197 198 self.assertEqual(model['path'], sub_dir.strip('/'))
198 199
199 200 def test_save_notebook_with_script(self):
200 201 with TemporaryDirectory() as td:
201 202 # Create a notebook
202 203 nm = FileNotebookManager(notebook_dir=td)
203 204 nm.save_script = True
204 205 model = nm.create_notebook_model()
205 206 name = model['name']
206 207 path = model['path']
207 208
208 209 # Get the model with 'content'
209 210 full_model = nm.get_notebook_model(name, path)
210 211
211 212 # Save the notebook
212 213 model = nm.save_notebook_model(full_model, name, path)
213 214
214 215 # Check that the script was created
215 216 py_path = os.path.join(td, os.path.splitext(name)[0]+'.py')
216 217 assert os.path.exists(py_path), py_path
217 218
218 219 def test_delete_notebook_model(self):
219 220 with TemporaryDirectory() as td:
220 221 # Test in the root directory
221 222 # Create a notebook
222 223 nm = FileNotebookManager(notebook_dir=td)
223 224 model = nm.create_notebook_model()
224 225 name = model['name']
225 226 path = model['path']
226 227
227 228 # Delete the notebook
228 229 nm.delete_notebook_model(name, path)
229 230
230 231 # Check that a 'get' on the deleted notebook raises and error
231 232 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
232 233
233 234 def test_copy_notebook(self):
234 235 with TemporaryDirectory() as td:
235 236 # Test in the root directory
236 237 # Create a notebook
237 238 nm = FileNotebookManager(notebook_dir=td)
238 239 path = u'Γ₯ b'
239 240 name = u'nb √.ipynb'
240 241 os.mkdir(os.path.join(td, path))
241 242 orig = nm.create_notebook_model({'name' : name}, path=path)
242 243
243 244 # copy with unspecified name
244 245 copy = nm.copy_notebook(name, path=path)
245 246 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
246 247
247 248 # copy with specified name
248 249 copy2 = nm.copy_notebook(name, u'copy 2.ipynb', path=path)
249 250 self.assertEqual(copy2['name'], u'copy 2.ipynb')
250 251
@@ -1,323 +1,329 b''
1 1 # coding: utf-8
2 2 """Test the notebooks webservice API."""
3 3
4 4 import io
5 5 import json
6 6 import os
7 7 import shutil
8 8 from unicodedata import normalize
9 9
10 10 pjoin = os.path.join
11 11
12 12 import requests
13 13
14 14 from IPython.html.utils import url_path_join, url_escape
15 15 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 16 from IPython.nbformat import current
17 17 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
18 18 new_heading_cell, to_notebook_json)
19 19 from IPython.nbformat import v2
20 20 from IPython.utils import py3compat
21 21 from IPython.utils.data import uniq_stable
22 22
23 23
24 # TODO: Remove this after we create the contents web service and directories are
25 # no longer listed by the notebook web service.
26 def notebooks_only(nb_list):
27 return [nb for nb in nb_list if 'type' not in nb]
28
29
24 30 class NBAPI(object):
25 31 """Wrapper for notebook API calls."""
26 32 def __init__(self, base_url):
27 33 self.base_url = base_url
28 34
29 35 def _req(self, verb, path, body=None):
30 36 response = requests.request(verb,
31 37 url_path_join(self.base_url, 'api/notebooks', path),
32 38 data=body,
33 39 )
34 40 response.raise_for_status()
35 41 return response
36 42
37 43 def list(self, path='/'):
38 44 return self._req('GET', path)
39 45
40 46 def read(self, name, path='/'):
41 47 return self._req('GET', url_path_join(path, name))
42 48
43 49 def create_untitled(self, path='/'):
44 50 return self._req('POST', path)
45 51
46 52 def upload_untitled(self, body, path='/'):
47 53 return self._req('POST', path, body)
48 54
49 55 def copy_untitled(self, copy_from, path='/'):
50 56 body = json.dumps({'copy_from':copy_from})
51 57 return self._req('POST', path, body)
52 58
53 59 def create(self, name, path='/'):
54 60 return self._req('PUT', url_path_join(path, name))
55 61
56 62 def upload(self, name, body, path='/'):
57 63 return self._req('PUT', url_path_join(path, name), body)
58 64
59 65 def copy(self, copy_from, copy_to, path='/'):
60 66 body = json.dumps({'copy_from':copy_from})
61 67 return self._req('PUT', url_path_join(path, copy_to), body)
62 68
63 69 def save(self, name, body, path='/'):
64 70 return self._req('PUT', url_path_join(path, name), body)
65 71
66 72 def delete(self, name, path='/'):
67 73 return self._req('DELETE', url_path_join(path, name))
68 74
69 75 def rename(self, name, path, new_name):
70 76 body = json.dumps({'name': new_name})
71 77 return self._req('PATCH', url_path_join(path, name), body)
72 78
73 79 def get_checkpoints(self, name, path):
74 80 return self._req('GET', url_path_join(path, name, 'checkpoints'))
75 81
76 82 def new_checkpoint(self, name, path):
77 83 return self._req('POST', url_path_join(path, name, 'checkpoints'))
78 84
79 85 def restore_checkpoint(self, name, path, checkpoint_id):
80 86 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
81 87
82 88 def delete_checkpoint(self, name, path, checkpoint_id):
83 89 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
84 90
85 91 class APITest(NotebookTestBase):
86 92 """Test the kernels web service API"""
87 93 dirs_nbs = [('', 'inroot'),
88 94 ('Directory with spaces in', 'inspace'),
89 95 (u'unicodΓ©', 'innonascii'),
90 96 ('foo', 'a'),
91 97 ('foo', 'b'),
92 98 ('foo', 'name with spaces'),
93 99 ('foo', u'unicodΓ©'),
94 100 ('foo/bar', 'baz'),
95 101 (u'Γ₯ b', u'Γ§ d')
96 102 ]
97 103
98 104 dirs = uniq_stable([d for (d,n) in dirs_nbs])
99 105 del dirs[0] # remove ''
100 106
101 107 def setUp(self):
102 108 nbdir = self.notebook_dir.name
103 109
104 110 for d in self.dirs:
105 111 d.replace('/', os.sep)
106 112 if not os.path.isdir(pjoin(nbdir, d)):
107 113 os.mkdir(pjoin(nbdir, d))
108 114
109 115 for d, name in self.dirs_nbs:
110 116 d = d.replace('/', os.sep)
111 117 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
112 118 encoding='utf-8') as f:
113 119 nb = new_notebook(name=name)
114 120 write(nb, f, format='ipynb')
115 121
116 122 self.nb_api = NBAPI(self.base_url())
117 123
118 124 def tearDown(self):
119 125 nbdir = self.notebook_dir.name
120 126
121 127 for dname in ['foo', 'Directory with spaces in', u'unicodΓ©', u'Γ₯ b']:
122 128 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
123 129
124 130 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
125 131 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
126 132
127 133 def test_list_notebooks(self):
128 nbs = self.nb_api.list().json()
134 nbs = notebooks_only(self.nb_api.list().json())
129 135 self.assertEqual(len(nbs), 1)
130 136 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
131 137
132 nbs = self.nb_api.list('/Directory with spaces in/').json()
138 nbs = notebooks_only(self.nb_api.list('/Directory with spaces in/').json())
133 139 self.assertEqual(len(nbs), 1)
134 140 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
135 141
136 nbs = self.nb_api.list(u'/unicodΓ©/').json()
142 nbs = notebooks_only(self.nb_api.list(u'/unicodΓ©/').json())
137 143 self.assertEqual(len(nbs), 1)
138 144 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
139 145 self.assertEqual(nbs[0]['path'], u'unicodΓ©')
140 146
141 nbs = self.nb_api.list('/foo/bar/').json()
147 nbs = notebooks_only(self.nb_api.list('/foo/bar/').json())
142 148 self.assertEqual(len(nbs), 1)
143 149 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
144 150 self.assertEqual(nbs[0]['path'], 'foo/bar')
145 151
146 nbs = self.nb_api.list('foo').json()
152 nbs = notebooks_only(self.nb_api.list('foo').json())
147 153 self.assertEqual(len(nbs), 4)
148 154 nbnames = { normalize('NFC', n['name']) for n in nbs }
149 155 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
150 156 expected = { normalize('NFC', name) for name in expected }
151 157 self.assertEqual(nbnames, expected)
152 158
153 159 def test_list_nonexistant_dir(self):
154 160 with assert_http_error(404):
155 161 self.nb_api.list('nonexistant')
156 162
157 163 def test_get_contents(self):
158 164 for d, name in self.dirs_nbs:
159 165 nb = self.nb_api.read('%s.ipynb' % name, d+'/').json()
160 166 self.assertEqual(nb['name'], u'%s.ipynb' % name)
161 167 self.assertIn('content', nb)
162 168 self.assertIn('metadata', nb['content'])
163 169 self.assertIsInstance(nb['content']['metadata'], dict)
164 170
165 171 # Name that doesn't exist - should be a 404
166 172 with assert_http_error(404):
167 173 self.nb_api.read('q.ipynb', 'foo')
168 174
169 175 def _check_nb_created(self, resp, name, path):
170 176 self.assertEqual(resp.status_code, 201)
171 177 location_header = py3compat.str_to_unicode(resp.headers['Location'])
172 178 self.assertEqual(location_header, url_escape(url_path_join(u'/api/notebooks', path, name)))
173 179 self.assertEqual(resp.json()['name'], name)
174 180 assert os.path.isfile(pjoin(
175 181 self.notebook_dir.name,
176 182 path.replace('/', os.sep),
177 183 name,
178 184 ))
179 185
180 186 def test_create_untitled(self):
181 187 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
182 188 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
183 189
184 190 # Second time
185 191 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
186 192 self._check_nb_created(resp, 'Untitled1.ipynb', u'Γ₯ b')
187 193
188 194 # And two directories down
189 195 resp = self.nb_api.create_untitled(path='foo/bar')
190 196 self._check_nb_created(resp, 'Untitled0.ipynb', 'foo/bar')
191 197
192 198 def test_upload_untitled(self):
193 199 nb = new_notebook(name='Upload test')
194 200 nbmodel = {'content': nb}
195 201 resp = self.nb_api.upload_untitled(path=u'Γ₯ b',
196 202 body=json.dumps(nbmodel))
197 203 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
198 204
199 205 def test_upload(self):
200 206 nb = new_notebook(name=u'ignored')
201 207 nbmodel = {'content': nb}
202 208 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
203 209 body=json.dumps(nbmodel))
204 210 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
205 211
206 212 def test_upload_v2(self):
207 213 nb = v2.new_notebook()
208 214 ws = v2.new_worksheet()
209 215 nb.worksheets.append(ws)
210 216 ws.cells.append(v2.new_code_cell(input='print("hi")'))
211 217 nbmodel = {'content': nb}
212 218 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
213 219 body=json.dumps(nbmodel))
214 220 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
215 221 resp = self.nb_api.read(u'Upload tΓ©st.ipynb', u'Γ₯ b')
216 222 data = resp.json()
217 223 self.assertEqual(data['content']['nbformat'], current.nbformat)
218 224 self.assertEqual(data['content']['orig_nbformat'], 2)
219 225
220 226 def test_copy_untitled(self):
221 227 resp = self.nb_api.copy_untitled(u'Γ§ d.ipynb', path=u'Γ₯ b')
222 228 self._check_nb_created(resp, u'Γ§ d-Copy0.ipynb', u'Γ₯ b')
223 229
224 230 def test_copy(self):
225 231 resp = self.nb_api.copy(u'Γ§ d.ipynb', u'cΓΈpy.ipynb', path=u'Γ₯ b')
226 232 self._check_nb_created(resp, u'cΓΈpy.ipynb', u'Γ₯ b')
227 233
228 234 def test_delete(self):
229 235 for d, name in self.dirs_nbs:
230 236 resp = self.nb_api.delete('%s.ipynb' % name, d)
231 237 self.assertEqual(resp.status_code, 204)
232 238
233 239 for d in self.dirs + ['/']:
234 nbs = self.nb_api.list(d).json()
240 nbs = notebooks_only(self.nb_api.list(d).json())
235 241 self.assertEqual(len(nbs), 0)
236 242
237 243 def test_rename(self):
238 244 resp = self.nb_api.rename('a.ipynb', 'foo', 'z.ipynb')
239 245 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
240 246 self.assertEqual(resp.json()['name'], 'z.ipynb')
241 247 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
242 248
243 nbs = self.nb_api.list('foo').json()
249 nbs = notebooks_only(self.nb_api.list('foo').json())
244 250 nbnames = set(n['name'] for n in nbs)
245 251 self.assertIn('z.ipynb', nbnames)
246 252 self.assertNotIn('a.ipynb', nbnames)
247 253
248 254 def test_rename_existing(self):
249 255 with assert_http_error(409):
250 256 self.nb_api.rename('a.ipynb', 'foo', 'b.ipynb')
251 257
252 258 def test_save(self):
253 259 resp = self.nb_api.read('a.ipynb', 'foo')
254 260 nbcontent = json.loads(resp.text)['content']
255 261 nb = to_notebook_json(nbcontent)
256 262 ws = new_worksheet()
257 263 nb.worksheets = [ws]
258 264 ws.cells.append(new_heading_cell(u'Created by test Β³'))
259 265
260 266 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
261 267 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
262 268
263 269 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
264 270 with io.open(nbfile, 'r', encoding='utf-8') as f:
265 271 newnb = read(f, format='ipynb')
266 272 self.assertEqual(newnb.worksheets[0].cells[0].source,
267 273 u'Created by test Β³')
268 274 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
269 275 newnb = to_notebook_json(nbcontent)
270 276 self.assertEqual(newnb.worksheets[0].cells[0].source,
271 277 u'Created by test Β³')
272 278
273 279 # Save and rename
274 280 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb}
275 281 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
276 282 saved = resp.json()
277 283 self.assertEqual(saved['name'], 'a2.ipynb')
278 284 self.assertEqual(saved['path'], 'foo/bar')
279 285 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
280 286 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
281 287 with assert_http_error(404):
282 288 self.nb_api.read('a.ipynb', 'foo')
283 289
284 290 def test_checkpoints(self):
285 291 resp = self.nb_api.read('a.ipynb', 'foo')
286 292 r = self.nb_api.new_checkpoint('a.ipynb', 'foo')
287 293 self.assertEqual(r.status_code, 201)
288 294 cp1 = r.json()
289 295 self.assertEqual(set(cp1), {'id', 'last_modified'})
290 296 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
291 297
292 298 # Modify it
293 299 nbcontent = json.loads(resp.text)['content']
294 300 nb = to_notebook_json(nbcontent)
295 301 ws = new_worksheet()
296 302 nb.worksheets = [ws]
297 303 hcell = new_heading_cell('Created by test')
298 304 ws.cells.append(hcell)
299 305 # Save
300 306 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
301 307 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
302 308
303 309 # List checkpoints
304 310 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
305 311 self.assertEqual(cps, [cp1])
306 312
307 313 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
308 314 nb = to_notebook_json(nbcontent)
309 315 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
310 316
311 317 # Restore cp1
312 318 r = self.nb_api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
313 319 self.assertEqual(r.status_code, 204)
314 320 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
315 321 nb = to_notebook_json(nbcontent)
316 322 self.assertEqual(nb.worksheets, [])
317 323
318 324 # Delete cp1
319 325 r = self.nb_api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
320 326 self.assertEqual(r.status_code, 204)
321 327 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
322 328 self.assertEqual(cps, [])
323 329
General Comments 0
You need to be logged in to leave comments. Login now