##// END OF EJS Templates
Get the existing tests working.
Brian E. Granger -
Show More
@@ -1,455 +1,459 b''
1 """A notebook manager that uses the local file system for storage.
1 """A notebook manager that uses the local file system for storage.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 * Zach Sailer
6 * Zach Sailer
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2011 The IPython Development Team
10 # Copyright (C) 2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 import io
20 import io
21 import itertools
21 import itertools
22 import os
22 import os
23 import glob
23 import glob
24 import shutil
24 import shutil
25
25
26 from tornado import web
26 from tornado import web
27
27
28 from .nbmanager import NotebookManager
28 from .nbmanager import NotebookManager
29 from IPython.nbformat import current
29 from IPython.nbformat import current
30 from IPython.utils.traitlets import Unicode, Dict, Bool, TraitError
30 from IPython.utils.traitlets import Unicode, Dict, Bool, TraitError
31 from IPython.utils import tz
31 from IPython.utils import tz
32
32
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34 # Classes
34 # Classes
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36
36
37 class FileNotebookManager(NotebookManager):
37 class FileNotebookManager(NotebookManager):
38
38
39 save_script = Bool(False, config=True,
39 save_script = Bool(False, config=True,
40 help="""Automatically create a Python script when saving the notebook.
40 help="""Automatically create a Python script when saving the notebook.
41
41
42 For easier use of import, %run and %load across notebooks, a
42 For easier use of import, %run and %load across notebooks, a
43 <notebook-name>.py script will be created next to any
43 <notebook-name>.py script will be created next to any
44 <notebook-name>.ipynb on each save. This can also be set with the
44 <notebook-name>.ipynb on each save. This can also be set with the
45 short `--script` flag.
45 short `--script` flag.
46 """
46 """
47 )
47 )
48
48
49 checkpoint_dir = Unicode(config=True,
49 checkpoint_dir = Unicode(config=True,
50 help="""The location in which to keep notebook checkpoints
50 help="""The location in which to keep notebook checkpoints
51
51
52 By default, it is notebook-dir/.ipynb_checkpoints
52 By default, it is notebook-dir/.ipynb_checkpoints
53 """
53 """
54 )
54 )
55 def _checkpoint_dir_default(self):
55 def _checkpoint_dir_default(self):
56 return os.path.join(self.notebook_dir, '.ipynb_checkpoints')
56 return os.path.join(self.notebook_dir, '.ipynb_checkpoints')
57
57
58 def _checkpoint_dir_changed(self, name, old, new):
58 def _checkpoint_dir_changed(self, name, old, new):
59 """do a bit of validation of the checkpoint dir"""
59 """do a bit of validation of the checkpoint dir"""
60 if not os.path.isabs(new):
60 if not os.path.isabs(new):
61 # If we receive a non-absolute path, make it absolute.
61 # If we receive a non-absolute path, make it absolute.
62 abs_new = os.path.abspath(new)
62 abs_new = os.path.abspath(new)
63 self.checkpoint_dir = abs_new
63 self.checkpoint_dir = abs_new
64 return
64 return
65 if os.path.exists(new) and not os.path.isdir(new):
65 if os.path.exists(new) and not os.path.isdir(new):
66 raise TraitError("checkpoint dir %r is not a directory" % new)
66 raise TraitError("checkpoint dir %r is not a directory" % new)
67 if not os.path.exists(new):
67 if not os.path.exists(new):
68 self.log.info("Creating checkpoint dir %s", new)
68 self.log.info("Creating checkpoint dir %s", new)
69 try:
69 try:
70 os.mkdir(new)
70 os.mkdir(new)
71 except:
71 except:
72 raise TraitError("Couldn't create checkpoint dir %r" % new)
72 raise TraitError("Couldn't create checkpoint dir %r" % new)
73
73
74 def get_notebook_names(self, path=''):
74 def get_notebook_names(self, path=''):
75 """List all notebook names in the notebook dir and path."""
75 """List all notebook names in the notebook dir and path."""
76 path = path.strip('/')
76 path = path.strip('/')
77 if not os.path.isdir(self.get_os_path(path=path)):
77 if not os.path.isdir(self.get_os_path(path=path)):
78 raise web.HTTPError(404, 'Directory not found: ' + path)
78 raise web.HTTPError(404, 'Directory not found: ' + path)
79 names = glob.glob(self.get_os_path('*'+self.filename_ext, path))
79 names = glob.glob(self.get_os_path('*'+self.filename_ext, path))
80 names = [os.path.basename(name)
80 names = [os.path.basename(name)
81 for name in names]
81 for name in names]
82 return names
82 return names
83
83
84 def increment_filename(self, basename, path='', ext='.ipynb'):
84 def increment_filename(self, basename, path='', ext='.ipynb'):
85 """Return a non-used filename of the form basename<int>."""
85 """Return a non-used filename of the form basename<int>."""
86 path = path.strip('/')
86 path = path.strip('/')
87 for i in itertools.count():
87 for i in itertools.count():
88 name = u'{basename}{i}{ext}'.format(basename=basename, i=i, ext=ext)
88 name = u'{basename}{i}{ext}'.format(basename=basename, i=i, ext=ext)
89 os_path = self.get_os_path(name, path)
89 os_path = self.get_os_path(name, path)
90 if not os.path.isfile(os_path):
90 if not os.path.isfile(os_path):
91 break
91 break
92 return name
92 return name
93
93
94 def path_exists(self, path):
94 def path_exists(self, path):
95 """Does the API-style path (directory) actually exist?
95 """Does the API-style path (directory) actually exist?
96
96
97 Parameters
97 Parameters
98 ----------
98 ----------
99 path : string
99 path : string
100 The path to check. This is an API path (`/` separated,
100 The path to check. This is an API path (`/` separated,
101 relative to base notebook-dir).
101 relative to base notebook-dir).
102
102
103 Returns
103 Returns
104 -------
104 -------
105 exists : bool
105 exists : bool
106 Whether the path is indeed a directory.
106 Whether the path is indeed a directory.
107 """
107 """
108 path = path.strip('/')
108 path = path.strip('/')
109 os_path = self.get_os_path(path=path)
109 os_path = self.get_os_path(path=path)
110 return os.path.isdir(os_path)
110 return os.path.isdir(os_path)
111
111
112 def get_os_path(self, name=None, path=''):
112 def get_os_path(self, name=None, path=''):
113 """Given a notebook name and a URL path, return its file system
113 """Given a notebook name and a URL path, return its file system
114 path.
114 path.
115
115
116 Parameters
116 Parameters
117 ----------
117 ----------
118 name : string
118 name : string
119 The name of a notebook file with the .ipynb extension
119 The name of a notebook file with the .ipynb extension
120 path : string
120 path : string
121 The relative URL path (with '/' as separator) to the named
121 The relative URL path (with '/' as separator) to the named
122 notebook.
122 notebook.
123
123
124 Returns
124 Returns
125 -------
125 -------
126 path : string
126 path : string
127 A file system path that combines notebook_dir (location where
127 A file system path that combines notebook_dir (location where
128 server started), the relative path, and the filename with the
128 server started), the relative path, and the filename with the
129 current operating system's url.
129 current operating system's url.
130 """
130 """
131 parts = path.strip('/').split('/')
131 parts = path.strip('/').split('/')
132 parts = [p for p in parts if p != ''] # remove duplicate splits
132 parts = [p for p in parts if p != ''] # remove duplicate splits
133 if name is not None:
133 if name is not None:
134 parts.append(name)
134 parts.append(name)
135 path = os.path.join(self.notebook_dir, *parts)
135 path = os.path.join(self.notebook_dir, *parts)
136 return path
136 return path
137
137
138 def notebook_exists(self, name, path=''):
138 def notebook_exists(self, name, path=''):
139 """Returns a True if the notebook exists. Else, returns False.
139 """Returns a True if the notebook exists. Else, returns False.
140
140
141 Parameters
141 Parameters
142 ----------
142 ----------
143 name : string
143 name : string
144 The name of the notebook you are checking.
144 The name of the notebook you are checking.
145 path : string
145 path : string
146 The relative path to the notebook (with '/' as separator)
146 The relative path to the notebook (with '/' as separator)
147
147
148 Returns
148 Returns
149 -------
149 -------
150 bool
150 bool
151 """
151 """
152 path = path.strip('/')
152 path = path.strip('/')
153 nbpath = self.get_os_path(name, path=path)
153 nbpath = self.get_os_path(name, path=path)
154 return os.path.isfile(nbpath)
154 return os.path.isfile(nbpath)
155
155
156 # TODO: Remove this after we create the contents web service and directories are
157 # no longer listed by the notebook web service.
156 def list_dirs(self, path):
158 def list_dirs(self, path):
157 """List the directories for a given API style path."""
159 """List the directories for a given API style path."""
158 path = path.strip('/')
160 path = path.strip('/')
159 os_path = self.get_os_path('', path)
161 os_path = self.get_os_path('', path)
160 dir_names = os.listdir(os_path)
162 dir_names = os.listdir(os_path)
161 dirs = []
163 dirs = []
162 for name in dir_names:
164 for name in dir_names:
163 os_path = self.get_os_path(name, path)
165 os_path = self.get_os_path(name, path)
164 if os.path.isdir(os_path) and not name.startswith('.'):
166 if os.path.isdir(os_path) and not name.startswith('.'):
165 model = self.get_dir_model(name, path)
167 model = self.get_dir_model(name, path)
166 dirs.append(model)
168 dirs.append(model)
167 dirs = sorted(dirs, key=lambda item: item['name'])
169 dirs = sorted(dirs, key=lambda item: item['name'])
168 return dirs
170 return dirs
169
171
172 # TODO: Remove this after we create the contents web service and directories are
173 # no longer listed by the notebook web service.
170 def get_dir_model(self, name, path=''):
174 def get_dir_model(self, name, path=''):
171 """Get the directory model given a directory name and its API style path"""
175 """Get the directory model given a directory name and its API style path"""
172 path = path.strip('/')
176 path = path.strip('/')
173 os_path = self.get_os_path(name, path)
177 os_path = self.get_os_path(name, path)
174 if not os.path.isdir(os_path):
178 if not os.path.isdir(os_path):
175 raise IOError('directory does not exist: %r' % os_path)
179 raise IOError('directory does not exist: %r' % os_path)
176 info = os.stat(os_path)
180 info = os.stat(os_path)
177 last_modified = tz.utcfromtimestamp(info.st_mtime)
181 last_modified = tz.utcfromtimestamp(info.st_mtime)
178 created = tz.utcfromtimestamp(info.st_ctime)
182 created = tz.utcfromtimestamp(info.st_ctime)
179 # Create the notebook model.
183 # Create the notebook model.
180 model ={}
184 model ={}
181 model['name'] = name
185 model['name'] = name
182 model['path'] = path
186 model['path'] = path
183 model['last_modified'] = last_modified
187 model['last_modified'] = last_modified
184 model['created'] = created
188 model['created'] = created
185 model['type'] = 'directory'
189 model['type'] = 'directory'
186 return model
190 return model
187
191
188 def list_notebooks(self, path):
192 def list_notebooks(self, path):
189 """Returns a list of dictionaries that are the standard model
193 """Returns a list of dictionaries that are the standard model
190 for all notebooks in the relative 'path'.
194 for all notebooks in the relative 'path'.
191
195
192 Parameters
196 Parameters
193 ----------
197 ----------
194 path : str
198 path : str
195 the URL path that describes the relative path for the
199 the URL path that describes the relative path for the
196 listed notebooks
200 listed notebooks
197
201
198 Returns
202 Returns
199 -------
203 -------
200 notebooks : list of dicts
204 notebooks : list of dicts
201 a list of the notebook models without 'content'
205 a list of the notebook models without 'content'
202 """
206 """
203 path = path.strip('/')
207 path = path.strip('/')
204 notebook_names = self.get_notebook_names(path)
208 notebook_names = self.get_notebook_names(path)
205 index = []
209 index = []
206 notebooks = []
210 notebooks = []
207 for name in notebook_names:
211 for name in notebook_names:
208 model = self.get_notebook_model(name, path, content=False)
212 model = self.get_notebook_model(name, path, content=False)
209 if name.lower() == 'index.ipynb':
213 if name.lower() == 'index.ipynb':
210 index.append(model)
214 index.append(model)
211 else:
215 else:
212 notebooks.append(model)
216 notebooks.append(model)
213 notebooks = sorted(notebooks, key=lambda item: item['name'])
217 notebooks = sorted(notebooks, key=lambda item: item['name'])
214 notebooks = index + self.list_dirs(path) + notebooks
218 notebooks = index + self.list_dirs(path) + notebooks
215 return notebooks
219 return notebooks
216
220
217 def get_notebook_model(self, name, path='', content=True):
221 def get_notebook_model(self, name, path='', content=True):
218 """ Takes a path and name for a notebook and returns its model
222 """ Takes a path and name for a notebook and returns its model
219
223
220 Parameters
224 Parameters
221 ----------
225 ----------
222 name : str
226 name : str
223 the name of the notebook
227 the name of the notebook
224 path : str
228 path : str
225 the URL path that describes the relative path for
229 the URL path that describes the relative path for
226 the notebook
230 the notebook
227
231
228 Returns
232 Returns
229 -------
233 -------
230 model : dict
234 model : dict
231 the notebook model. If contents=True, returns the 'contents'
235 the notebook model. If contents=True, returns the 'contents'
232 dict in the model as well.
236 dict in the model as well.
233 """
237 """
234 path = path.strip('/')
238 path = path.strip('/')
235 if not self.notebook_exists(name=name, path=path):
239 if not self.notebook_exists(name=name, path=path):
236 raise web.HTTPError(404, u'Notebook does not exist: %s' % name)
240 raise web.HTTPError(404, u'Notebook does not exist: %s' % name)
237 os_path = self.get_os_path(name, path)
241 os_path = self.get_os_path(name, path)
238 info = os.stat(os_path)
242 info = os.stat(os_path)
239 last_modified = tz.utcfromtimestamp(info.st_mtime)
243 last_modified = tz.utcfromtimestamp(info.st_mtime)
240 created = tz.utcfromtimestamp(info.st_ctime)
244 created = tz.utcfromtimestamp(info.st_ctime)
241 # Create the notebook model.
245 # Create the notebook model.
242 model ={}
246 model ={}
243 model['name'] = name
247 model['name'] = name
244 model['path'] = path
248 model['path'] = path
245 model['last_modified'] = last_modified
249 model['last_modified'] = last_modified
246 model['created'] = created
250 model['created'] = created
247 if content:
251 if content:
248 with io.open(os_path, 'r', encoding='utf-8') as f:
252 with io.open(os_path, 'r', encoding='utf-8') as f:
249 try:
253 try:
250 nb = current.read(f, u'json')
254 nb = current.read(f, u'json')
251 except Exception as e:
255 except Exception as e:
252 raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
256 raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
253 self.mark_trusted_cells(nb, path, name)
257 self.mark_trusted_cells(nb, path, name)
254 model['content'] = nb
258 model['content'] = nb
255 return model
259 return model
256
260
257 def save_notebook_model(self, model, name='', path=''):
261 def save_notebook_model(self, model, name='', path=''):
258 """Save the notebook model and return the model with no content."""
262 """Save the notebook model and return the model with no content."""
259 path = path.strip('/')
263 path = path.strip('/')
260
264
261 if 'content' not in model:
265 if 'content' not in model:
262 raise web.HTTPError(400, u'No notebook JSON data provided')
266 raise web.HTTPError(400, u'No notebook JSON data provided')
263
267
264 # One checkpoint should always exist
268 # One checkpoint should always exist
265 if self.notebook_exists(name, path) and not self.list_checkpoints(name, path):
269 if self.notebook_exists(name, path) and not self.list_checkpoints(name, path):
266 self.create_checkpoint(name, path)
270 self.create_checkpoint(name, path)
267
271
268 new_path = model.get('path', path).strip('/')
272 new_path = model.get('path', path).strip('/')
269 new_name = model.get('name', name)
273 new_name = model.get('name', name)
270
274
271 if path != new_path or name != new_name:
275 if path != new_path or name != new_name:
272 self.rename_notebook(name, path, new_name, new_path)
276 self.rename_notebook(name, path, new_name, new_path)
273
277
274 # Save the notebook file
278 # Save the notebook file
275 os_path = self.get_os_path(new_name, new_path)
279 os_path = self.get_os_path(new_name, new_path)
276 nb = current.to_notebook_json(model['content'])
280 nb = current.to_notebook_json(model['content'])
277
281
278 self.check_and_sign(nb, new_path, new_name)
282 self.check_and_sign(nb, new_path, new_name)
279
283
280 if 'name' in nb['metadata']:
284 if 'name' in nb['metadata']:
281 nb['metadata']['name'] = u''
285 nb['metadata']['name'] = u''
282 try:
286 try:
283 self.log.debug("Autosaving notebook %s", os_path)
287 self.log.debug("Autosaving notebook %s", os_path)
284 with io.open(os_path, 'w', encoding='utf-8') as f:
288 with io.open(os_path, 'w', encoding='utf-8') as f:
285 current.write(nb, f, u'json')
289 current.write(nb, f, u'json')
286 except Exception as e:
290 except Exception as e:
287 raise web.HTTPError(400, u'Unexpected error while autosaving notebook: %s %s' % (os_path, e))
291 raise web.HTTPError(400, u'Unexpected error while autosaving notebook: %s %s' % (os_path, e))
288
292
289 # Save .py script as well
293 # Save .py script as well
290 if self.save_script:
294 if self.save_script:
291 py_path = os.path.splitext(os_path)[0] + '.py'
295 py_path = os.path.splitext(os_path)[0] + '.py'
292 self.log.debug("Writing script %s", py_path)
296 self.log.debug("Writing script %s", py_path)
293 try:
297 try:
294 with io.open(py_path, 'w', encoding='utf-8') as f:
298 with io.open(py_path, 'w', encoding='utf-8') as f:
295 current.write(nb, f, u'py')
299 current.write(nb, f, u'py')
296 except Exception as e:
300 except Exception as e:
297 raise web.HTTPError(400, u'Unexpected error while saving notebook as script: %s %s' % (py_path, e))
301 raise web.HTTPError(400, u'Unexpected error while saving notebook as script: %s %s' % (py_path, e))
298
302
299 model = self.get_notebook_model(new_name, new_path, content=False)
303 model = self.get_notebook_model(new_name, new_path, content=False)
300 return model
304 return model
301
305
302 def update_notebook_model(self, model, name, path=''):
306 def update_notebook_model(self, model, name, path=''):
303 """Update the notebook's path and/or name"""
307 """Update the notebook's path and/or name"""
304 path = path.strip('/')
308 path = path.strip('/')
305 new_name = model.get('name', name)
309 new_name = model.get('name', name)
306 new_path = model.get('path', path).strip('/')
310 new_path = model.get('path', path).strip('/')
307 if path != new_path or name != new_name:
311 if path != new_path or name != new_name:
308 self.rename_notebook(name, path, new_name, new_path)
312 self.rename_notebook(name, path, new_name, new_path)
309 model = self.get_notebook_model(new_name, new_path, content=False)
313 model = self.get_notebook_model(new_name, new_path, content=False)
310 return model
314 return model
311
315
312 def delete_notebook_model(self, name, path=''):
316 def delete_notebook_model(self, name, path=''):
313 """Delete notebook by name and path."""
317 """Delete notebook by name and path."""
314 path = path.strip('/')
318 path = path.strip('/')
315 os_path = self.get_os_path(name, path)
319 os_path = self.get_os_path(name, path)
316 if not os.path.isfile(os_path):
320 if not os.path.isfile(os_path):
317 raise web.HTTPError(404, u'Notebook does not exist: %s' % os_path)
321 raise web.HTTPError(404, u'Notebook does not exist: %s' % os_path)
318
322
319 # clear checkpoints
323 # clear checkpoints
320 for checkpoint in self.list_checkpoints(name, path):
324 for checkpoint in self.list_checkpoints(name, path):
321 checkpoint_id = checkpoint['id']
325 checkpoint_id = checkpoint['id']
322 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
326 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
323 if os.path.isfile(cp_path):
327 if os.path.isfile(cp_path):
324 self.log.debug("Unlinking checkpoint %s", cp_path)
328 self.log.debug("Unlinking checkpoint %s", cp_path)
325 os.unlink(cp_path)
329 os.unlink(cp_path)
326
330
327 self.log.debug("Unlinking notebook %s", os_path)
331 self.log.debug("Unlinking notebook %s", os_path)
328 os.unlink(os_path)
332 os.unlink(os_path)
329
333
330 def rename_notebook(self, old_name, old_path, new_name, new_path):
334 def rename_notebook(self, old_name, old_path, new_name, new_path):
331 """Rename a notebook."""
335 """Rename a notebook."""
332 old_path = old_path.strip('/')
336 old_path = old_path.strip('/')
333 new_path = new_path.strip('/')
337 new_path = new_path.strip('/')
334 if new_name == old_name and new_path == old_path:
338 if new_name == old_name and new_path == old_path:
335 return
339 return
336
340
337 new_os_path = self.get_os_path(new_name, new_path)
341 new_os_path = self.get_os_path(new_name, new_path)
338 old_os_path = self.get_os_path(old_name, old_path)
342 old_os_path = self.get_os_path(old_name, old_path)
339
343
340 # Should we proceed with the move?
344 # Should we proceed with the move?
341 if os.path.isfile(new_os_path):
345 if os.path.isfile(new_os_path):
342 raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
346 raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
343 if self.save_script:
347 if self.save_script:
344 old_py_path = os.path.splitext(old_os_path)[0] + '.py'
348 old_py_path = os.path.splitext(old_os_path)[0] + '.py'
345 new_py_path = os.path.splitext(new_os_path)[0] + '.py'
349 new_py_path = os.path.splitext(new_os_path)[0] + '.py'
346 if os.path.isfile(new_py_path):
350 if os.path.isfile(new_py_path):
347 raise web.HTTPError(409, u'Python script with name already exists: %s' % new_py_path)
351 raise web.HTTPError(409, u'Python script with name already exists: %s' % new_py_path)
348
352
349 # Move the notebook file
353 # Move the notebook file
350 try:
354 try:
351 os.rename(old_os_path, new_os_path)
355 os.rename(old_os_path, new_os_path)
352 except Exception as e:
356 except Exception as e:
353 raise web.HTTPError(500, u'Unknown error renaming notebook: %s %s' % (old_os_path, e))
357 raise web.HTTPError(500, u'Unknown error renaming notebook: %s %s' % (old_os_path, e))
354
358
355 # Move the checkpoints
359 # Move the checkpoints
356 old_checkpoints = self.list_checkpoints(old_name, old_path)
360 old_checkpoints = self.list_checkpoints(old_name, old_path)
357 for cp in old_checkpoints:
361 for cp in old_checkpoints:
358 checkpoint_id = cp['id']
362 checkpoint_id = cp['id']
359 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
363 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
360 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
364 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
361 if os.path.isfile(old_cp_path):
365 if os.path.isfile(old_cp_path):
362 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
366 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
363 os.rename(old_cp_path, new_cp_path)
367 os.rename(old_cp_path, new_cp_path)
364
368
365 # Move the .py script
369 # Move the .py script
366 if self.save_script:
370 if self.save_script:
367 os.rename(old_py_path, new_py_path)
371 os.rename(old_py_path, new_py_path)
368
372
369 # Checkpoint-related utilities
373 # Checkpoint-related utilities
370
374
371 def get_checkpoint_path(self, checkpoint_id, name, path=''):
375 def get_checkpoint_path(self, checkpoint_id, name, path=''):
372 """find the path to a checkpoint"""
376 """find the path to a checkpoint"""
373 path = path.strip('/')
377 path = path.strip('/')
374 basename, _ = os.path.splitext(name)
378 basename, _ = os.path.splitext(name)
375 filename = u"{name}-{checkpoint_id}{ext}".format(
379 filename = u"{name}-{checkpoint_id}{ext}".format(
376 name=basename,
380 name=basename,
377 checkpoint_id=checkpoint_id,
381 checkpoint_id=checkpoint_id,
378 ext=self.filename_ext,
382 ext=self.filename_ext,
379 )
383 )
380 cp_path = os.path.join(path, self.checkpoint_dir, filename)
384 cp_path = os.path.join(path, self.checkpoint_dir, filename)
381 return cp_path
385 return cp_path
382
386
383 def get_checkpoint_model(self, checkpoint_id, name, path=''):
387 def get_checkpoint_model(self, checkpoint_id, name, path=''):
384 """construct the info dict for a given checkpoint"""
388 """construct the info dict for a given checkpoint"""
385 path = path.strip('/')
389 path = path.strip('/')
386 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
390 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
387 stats = os.stat(cp_path)
391 stats = os.stat(cp_path)
388 last_modified = tz.utcfromtimestamp(stats.st_mtime)
392 last_modified = tz.utcfromtimestamp(stats.st_mtime)
389 info = dict(
393 info = dict(
390 id = checkpoint_id,
394 id = checkpoint_id,
391 last_modified = last_modified,
395 last_modified = last_modified,
392 )
396 )
393 return info
397 return info
394
398
395 # public checkpoint API
399 # public checkpoint API
396
400
397 def create_checkpoint(self, name, path=''):
401 def create_checkpoint(self, name, path=''):
398 """Create a checkpoint from the current state of a notebook"""
402 """Create a checkpoint from the current state of a notebook"""
399 path = path.strip('/')
403 path = path.strip('/')
400 nb_path = self.get_os_path(name, path)
404 nb_path = self.get_os_path(name, path)
401 # only the one checkpoint ID:
405 # only the one checkpoint ID:
402 checkpoint_id = u"checkpoint"
406 checkpoint_id = u"checkpoint"
403 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
407 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
404 self.log.debug("creating checkpoint for notebook %s", name)
408 self.log.debug("creating checkpoint for notebook %s", name)
405 if not os.path.exists(self.checkpoint_dir):
409 if not os.path.exists(self.checkpoint_dir):
406 os.mkdir(self.checkpoint_dir)
410 os.mkdir(self.checkpoint_dir)
407 shutil.copy2(nb_path, cp_path)
411 shutil.copy2(nb_path, cp_path)
408
412
409 # return the checkpoint info
413 # return the checkpoint info
410 return self.get_checkpoint_model(checkpoint_id, name, path)
414 return self.get_checkpoint_model(checkpoint_id, name, path)
411
415
412 def list_checkpoints(self, name, path=''):
416 def list_checkpoints(self, name, path=''):
413 """list the checkpoints for a given notebook
417 """list the checkpoints for a given notebook
414
418
415 This notebook manager currently only supports one checkpoint per notebook.
419 This notebook manager currently only supports one checkpoint per notebook.
416 """
420 """
417 path = path.strip('/')
421 path = path.strip('/')
418 checkpoint_id = "checkpoint"
422 checkpoint_id = "checkpoint"
419 path = self.get_checkpoint_path(checkpoint_id, name, path)
423 path = self.get_checkpoint_path(checkpoint_id, name, path)
420 if not os.path.exists(path):
424 if not os.path.exists(path):
421 return []
425 return []
422 else:
426 else:
423 return [self.get_checkpoint_model(checkpoint_id, name, path)]
427 return [self.get_checkpoint_model(checkpoint_id, name, path)]
424
428
425
429
426 def restore_checkpoint(self, checkpoint_id, name, path=''):
430 def restore_checkpoint(self, checkpoint_id, name, path=''):
427 """restore a notebook to a checkpointed state"""
431 """restore a notebook to a checkpointed state"""
428 path = path.strip('/')
432 path = path.strip('/')
429 self.log.info("restoring Notebook %s from checkpoint %s", name, checkpoint_id)
433 self.log.info("restoring Notebook %s from checkpoint %s", name, checkpoint_id)
430 nb_path = self.get_os_path(name, path)
434 nb_path = self.get_os_path(name, path)
431 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
435 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
432 if not os.path.isfile(cp_path):
436 if not os.path.isfile(cp_path):
433 self.log.debug("checkpoint file does not exist: %s", cp_path)
437 self.log.debug("checkpoint file does not exist: %s", cp_path)
434 raise web.HTTPError(404,
438 raise web.HTTPError(404,
435 u'Notebook checkpoint does not exist: %s-%s' % (name, checkpoint_id)
439 u'Notebook checkpoint does not exist: %s-%s' % (name, checkpoint_id)
436 )
440 )
437 # ensure notebook is readable (never restore from an unreadable notebook)
441 # ensure notebook is readable (never restore from an unreadable notebook)
438 with io.open(cp_path, 'r', encoding='utf-8') as f:
442 with io.open(cp_path, 'r', encoding='utf-8') as f:
439 nb = current.read(f, u'json')
443 nb = current.read(f, u'json')
440 shutil.copy2(cp_path, nb_path)
444 shutil.copy2(cp_path, nb_path)
441 self.log.debug("copying %s -> %s", cp_path, nb_path)
445 self.log.debug("copying %s -> %s", cp_path, nb_path)
442
446
443 def delete_checkpoint(self, checkpoint_id, name, path=''):
447 def delete_checkpoint(self, checkpoint_id, name, path=''):
444 """delete a notebook's checkpoint"""
448 """delete a notebook's checkpoint"""
445 path = path.strip('/')
449 path = path.strip('/')
446 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
450 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
447 if not os.path.isfile(cp_path):
451 if not os.path.isfile(cp_path):
448 raise web.HTTPError(404,
452 raise web.HTTPError(404,
449 u'Notebook checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
453 u'Notebook checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
450 )
454 )
451 self.log.debug("unlinking %s", cp_path)
455 self.log.debug("unlinking %s", cp_path)
452 os.unlink(cp_path)
456 os.unlink(cp_path)
453
457
454 def info_string(self):
458 def info_string(self):
455 return "Serving notebooks from local directory: %s" % self.notebook_dir
459 return "Serving notebooks from local directory: %s" % self.notebook_dir
@@ -1,250 +1,251 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import os
5 import os
6
6
7 from tornado.web import HTTPError
7 from tornado.web import HTTPError
8 from unittest import TestCase
8 from unittest import TestCase
9 from tempfile import NamedTemporaryFile
9 from tempfile import NamedTemporaryFile
10
10
11 from IPython.utils.tempdir import TemporaryDirectory
11 from IPython.utils.tempdir import TemporaryDirectory
12 from IPython.utils.traitlets import TraitError
12 from IPython.utils.traitlets import TraitError
13 from IPython.html.utils import url_path_join
13 from IPython.html.utils import url_path_join
14
14
15 from ..filenbmanager import FileNotebookManager
15 from ..filenbmanager import FileNotebookManager
16 from ..nbmanager import NotebookManager
16 from ..nbmanager import NotebookManager
17
17
18
18 class TestFileNotebookManager(TestCase):
19 class TestFileNotebookManager(TestCase):
19
20
20 def test_nb_dir(self):
21 def test_nb_dir(self):
21 with TemporaryDirectory() as td:
22 with TemporaryDirectory() as td:
22 fm = FileNotebookManager(notebook_dir=td)
23 fm = FileNotebookManager(notebook_dir=td)
23 self.assertEqual(fm.notebook_dir, td)
24 self.assertEqual(fm.notebook_dir, td)
24
25
25 def test_create_nb_dir(self):
26 def test_create_nb_dir(self):
26 with TemporaryDirectory() as td:
27 with TemporaryDirectory() as td:
27 nbdir = os.path.join(td, 'notebooks')
28 nbdir = os.path.join(td, 'notebooks')
28 fm = FileNotebookManager(notebook_dir=nbdir)
29 fm = FileNotebookManager(notebook_dir=nbdir)
29 self.assertEqual(fm.notebook_dir, nbdir)
30 self.assertEqual(fm.notebook_dir, nbdir)
30
31
31 def test_missing_nb_dir(self):
32 def test_missing_nb_dir(self):
32 with TemporaryDirectory() as td:
33 with TemporaryDirectory() as td:
33 nbdir = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
34 nbdir = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
34 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=nbdir)
35 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=nbdir)
35
36
36 def test_invalid_nb_dir(self):
37 def test_invalid_nb_dir(self):
37 with NamedTemporaryFile() as tf:
38 with NamedTemporaryFile() as tf:
38 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=tf.name)
39 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=tf.name)
39
40
40 def test_get_os_path(self):
41 def test_get_os_path(self):
41 # full filesystem path should be returned with correct operating system
42 # full filesystem path should be returned with correct operating system
42 # separators.
43 # separators.
43 with TemporaryDirectory() as td:
44 with TemporaryDirectory() as td:
44 nbdir = os.path.join(td, 'notebooks')
45 nbdir = os.path.join(td, 'notebooks')
45 fm = FileNotebookManager(notebook_dir=nbdir)
46 fm = FileNotebookManager(notebook_dir=nbdir)
46 path = fm.get_os_path('test.ipynb', '/path/to/notebook/')
47 path = fm.get_os_path('test.ipynb', '/path/to/notebook/')
47 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
48 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
48 fs_path = os.path.join(fm.notebook_dir, *rel_path_list)
49 fs_path = os.path.join(fm.notebook_dir, *rel_path_list)
49 self.assertEqual(path, fs_path)
50 self.assertEqual(path, fs_path)
50
51
51 fm = FileNotebookManager(notebook_dir=nbdir)
52 fm = FileNotebookManager(notebook_dir=nbdir)
52 path = fm.get_os_path('test.ipynb')
53 path = fm.get_os_path('test.ipynb')
53 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
54 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
54 self.assertEqual(path, fs_path)
55 self.assertEqual(path, fs_path)
55
56
56 fm = FileNotebookManager(notebook_dir=nbdir)
57 fm = FileNotebookManager(notebook_dir=nbdir)
57 path = fm.get_os_path('test.ipynb', '////')
58 path = fm.get_os_path('test.ipynb', '////')
58 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
59 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
59 self.assertEqual(path, fs_path)
60 self.assertEqual(path, fs_path)
60
61
61 class TestNotebookManager(TestCase):
62 class TestNotebookManager(TestCase):
62
63
63 def make_dir(self, abs_path, rel_path):
64 def make_dir(self, abs_path, rel_path):
64 """make subdirectory, rel_path is the relative path
65 """make subdirectory, rel_path is the relative path
65 to that directory from the location where the server started"""
66 to that directory from the location where the server started"""
66 os_path = os.path.join(abs_path, rel_path)
67 os_path = os.path.join(abs_path, rel_path)
67 try:
68 try:
68 os.makedirs(os_path)
69 os.makedirs(os_path)
69 except OSError:
70 except OSError:
70 print("Directory already exists.")
71 print("Directory already exists.")
71
72
72 def test_create_notebook_model(self):
73 def test_create_notebook_model(self):
73 with TemporaryDirectory() as td:
74 with TemporaryDirectory() as td:
74 # Test in root directory
75 # Test in root directory
75 nm = FileNotebookManager(notebook_dir=td)
76 nm = FileNotebookManager(notebook_dir=td)
76 model = nm.create_notebook_model()
77 model = nm.create_notebook_model()
77 assert isinstance(model, dict)
78 assert isinstance(model, dict)
78 self.assertIn('name', model)
79 self.assertIn('name', model)
79 self.assertIn('path', model)
80 self.assertIn('path', model)
80 self.assertEqual(model['name'], 'Untitled0.ipynb')
81 self.assertEqual(model['name'], 'Untitled0.ipynb')
81 self.assertEqual(model['path'], '')
82 self.assertEqual(model['path'], '')
82
83
83 # Test in sub-directory
84 # Test in sub-directory
84 sub_dir = '/foo/'
85 sub_dir = '/foo/'
85 self.make_dir(nm.notebook_dir, 'foo')
86 self.make_dir(nm.notebook_dir, 'foo')
86 model = nm.create_notebook_model(None, sub_dir)
87 model = nm.create_notebook_model(None, sub_dir)
87 assert isinstance(model, dict)
88 assert isinstance(model, dict)
88 self.assertIn('name', model)
89 self.assertIn('name', model)
89 self.assertIn('path', model)
90 self.assertIn('path', model)
90 self.assertEqual(model['name'], 'Untitled0.ipynb')
91 self.assertEqual(model['name'], 'Untitled0.ipynb')
91 self.assertEqual(model['path'], sub_dir.strip('/'))
92 self.assertEqual(model['path'], sub_dir.strip('/'))
92
93
93 def test_get_notebook_model(self):
94 def test_get_notebook_model(self):
94 with TemporaryDirectory() as td:
95 with TemporaryDirectory() as td:
95 # Test in root directory
96 # Test in root directory
96 # Create a notebook
97 # Create a notebook
97 nm = FileNotebookManager(notebook_dir=td)
98 nm = FileNotebookManager(notebook_dir=td)
98 model = nm.create_notebook_model()
99 model = nm.create_notebook_model()
99 name = model['name']
100 name = model['name']
100 path = model['path']
101 path = model['path']
101
102
102 # Check that we 'get' on the notebook we just created
103 # Check that we 'get' on the notebook we just created
103 model2 = nm.get_notebook_model(name, path)
104 model2 = nm.get_notebook_model(name, path)
104 assert isinstance(model2, dict)
105 assert isinstance(model2, dict)
105 self.assertIn('name', model2)
106 self.assertIn('name', model2)
106 self.assertIn('path', model2)
107 self.assertIn('path', model2)
107 self.assertEqual(model['name'], name)
108 self.assertEqual(model['name'], name)
108 self.assertEqual(model['path'], path)
109 self.assertEqual(model['path'], path)
109
110
110 # Test in sub-directory
111 # Test in sub-directory
111 sub_dir = '/foo/'
112 sub_dir = '/foo/'
112 self.make_dir(nm.notebook_dir, 'foo')
113 self.make_dir(nm.notebook_dir, 'foo')
113 model = nm.create_notebook_model(None, sub_dir)
114 model = nm.create_notebook_model(None, sub_dir)
114 model2 = nm.get_notebook_model(name, sub_dir)
115 model2 = nm.get_notebook_model(name, sub_dir)
115 assert isinstance(model2, dict)
116 assert isinstance(model2, dict)
116 self.assertIn('name', model2)
117 self.assertIn('name', model2)
117 self.assertIn('path', model2)
118 self.assertIn('path', model2)
118 self.assertIn('content', model2)
119 self.assertIn('content', model2)
119 self.assertEqual(model2['name'], 'Untitled0.ipynb')
120 self.assertEqual(model2['name'], 'Untitled0.ipynb')
120 self.assertEqual(model2['path'], sub_dir.strip('/'))
121 self.assertEqual(model2['path'], sub_dir.strip('/'))
121
122
122 def test_update_notebook_model(self):
123 def test_update_notebook_model(self):
123 with TemporaryDirectory() as td:
124 with TemporaryDirectory() as td:
124 # Test in root directory
125 # Test in root directory
125 # Create a notebook
126 # Create a notebook
126 nm = FileNotebookManager(notebook_dir=td)
127 nm = FileNotebookManager(notebook_dir=td)
127 model = nm.create_notebook_model()
128 model = nm.create_notebook_model()
128 name = model['name']
129 name = model['name']
129 path = model['path']
130 path = model['path']
130
131
131 # Change the name in the model for rename
132 # Change the name in the model for rename
132 model['name'] = 'test.ipynb'
133 model['name'] = 'test.ipynb'
133 model = nm.update_notebook_model(model, name, path)
134 model = nm.update_notebook_model(model, name, path)
134 assert isinstance(model, dict)
135 assert isinstance(model, dict)
135 self.assertIn('name', model)
136 self.assertIn('name', model)
136 self.assertIn('path', model)
137 self.assertIn('path', model)
137 self.assertEqual(model['name'], 'test.ipynb')
138 self.assertEqual(model['name'], 'test.ipynb')
138
139
139 # Make sure the old name is gone
140 # Make sure the old name is gone
140 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
141 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
141
142
142 # Test in sub-directory
143 # Test in sub-directory
143 # Create a directory and notebook in that directory
144 # Create a directory and notebook in that directory
144 sub_dir = '/foo/'
145 sub_dir = '/foo/'
145 self.make_dir(nm.notebook_dir, 'foo')
146 self.make_dir(nm.notebook_dir, 'foo')
146 model = nm.create_notebook_model(None, sub_dir)
147 model = nm.create_notebook_model(None, sub_dir)
147 name = model['name']
148 name = model['name']
148 path = model['path']
149 path = model['path']
149
150
150 # Change the name in the model for rename
151 # Change the name in the model for rename
151 model['name'] = 'test_in_sub.ipynb'
152 model['name'] = 'test_in_sub.ipynb'
152 model = nm.update_notebook_model(model, name, path)
153 model = nm.update_notebook_model(model, name, path)
153 assert isinstance(model, dict)
154 assert isinstance(model, dict)
154 self.assertIn('name', model)
155 self.assertIn('name', model)
155 self.assertIn('path', model)
156 self.assertIn('path', model)
156 self.assertEqual(model['name'], 'test_in_sub.ipynb')
157 self.assertEqual(model['name'], 'test_in_sub.ipynb')
157 self.assertEqual(model['path'], sub_dir.strip('/'))
158 self.assertEqual(model['path'], sub_dir.strip('/'))
158
159
159 # Make sure the old name is gone
160 # Make sure the old name is gone
160 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
161 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
161
162
162 def test_save_notebook_model(self):
163 def test_save_notebook_model(self):
163 with TemporaryDirectory() as td:
164 with TemporaryDirectory() as td:
164 # Test in the root directory
165 # Test in the root directory
165 # Create a notebook
166 # Create a notebook
166 nm = FileNotebookManager(notebook_dir=td)
167 nm = FileNotebookManager(notebook_dir=td)
167 model = nm.create_notebook_model()
168 model = nm.create_notebook_model()
168 name = model['name']
169 name = model['name']
169 path = model['path']
170 path = model['path']
170
171
171 # Get the model with 'content'
172 # Get the model with 'content'
172 full_model = nm.get_notebook_model(name, path)
173 full_model = nm.get_notebook_model(name, path)
173
174
174 # Save the notebook
175 # Save the notebook
175 model = nm.save_notebook_model(full_model, name, path)
176 model = nm.save_notebook_model(full_model, name, path)
176 assert isinstance(model, dict)
177 assert isinstance(model, dict)
177 self.assertIn('name', model)
178 self.assertIn('name', model)
178 self.assertIn('path', model)
179 self.assertIn('path', model)
179 self.assertEqual(model['name'], name)
180 self.assertEqual(model['name'], name)
180 self.assertEqual(model['path'], path)
181 self.assertEqual(model['path'], path)
181
182
182 # Test in sub-directory
183 # Test in sub-directory
183 # Create a directory and notebook in that directory
184 # Create a directory and notebook in that directory
184 sub_dir = '/foo/'
185 sub_dir = '/foo/'
185 self.make_dir(nm.notebook_dir, 'foo')
186 self.make_dir(nm.notebook_dir, 'foo')
186 model = nm.create_notebook_model(None, sub_dir)
187 model = nm.create_notebook_model(None, sub_dir)
187 name = model['name']
188 name = model['name']
188 path = model['path']
189 path = model['path']
189 model = nm.get_notebook_model(name, path)
190 model = nm.get_notebook_model(name, path)
190
191
191 # Change the name in the model for rename
192 # Change the name in the model for rename
192 model = nm.save_notebook_model(model, name, path)
193 model = nm.save_notebook_model(model, name, path)
193 assert isinstance(model, dict)
194 assert isinstance(model, dict)
194 self.assertIn('name', model)
195 self.assertIn('name', model)
195 self.assertIn('path', model)
196 self.assertIn('path', model)
196 self.assertEqual(model['name'], 'Untitled0.ipynb')
197 self.assertEqual(model['name'], 'Untitled0.ipynb')
197 self.assertEqual(model['path'], sub_dir.strip('/'))
198 self.assertEqual(model['path'], sub_dir.strip('/'))
198
199
199 def test_save_notebook_with_script(self):
200 def test_save_notebook_with_script(self):
200 with TemporaryDirectory() as td:
201 with TemporaryDirectory() as td:
201 # Create a notebook
202 # Create a notebook
202 nm = FileNotebookManager(notebook_dir=td)
203 nm = FileNotebookManager(notebook_dir=td)
203 nm.save_script = True
204 nm.save_script = True
204 model = nm.create_notebook_model()
205 model = nm.create_notebook_model()
205 name = model['name']
206 name = model['name']
206 path = model['path']
207 path = model['path']
207
208
208 # Get the model with 'content'
209 # Get the model with 'content'
209 full_model = nm.get_notebook_model(name, path)
210 full_model = nm.get_notebook_model(name, path)
210
211
211 # Save the notebook
212 # Save the notebook
212 model = nm.save_notebook_model(full_model, name, path)
213 model = nm.save_notebook_model(full_model, name, path)
213
214
214 # Check that the script was created
215 # Check that the script was created
215 py_path = os.path.join(td, os.path.splitext(name)[0]+'.py')
216 py_path = os.path.join(td, os.path.splitext(name)[0]+'.py')
216 assert os.path.exists(py_path), py_path
217 assert os.path.exists(py_path), py_path
217
218
218 def test_delete_notebook_model(self):
219 def test_delete_notebook_model(self):
219 with TemporaryDirectory() as td:
220 with TemporaryDirectory() as td:
220 # Test in the root directory
221 # Test in the root directory
221 # Create a notebook
222 # Create a notebook
222 nm = FileNotebookManager(notebook_dir=td)
223 nm = FileNotebookManager(notebook_dir=td)
223 model = nm.create_notebook_model()
224 model = nm.create_notebook_model()
224 name = model['name']
225 name = model['name']
225 path = model['path']
226 path = model['path']
226
227
227 # Delete the notebook
228 # Delete the notebook
228 nm.delete_notebook_model(name, path)
229 nm.delete_notebook_model(name, path)
229
230
230 # Check that a 'get' on the deleted notebook raises and error
231 # Check that a 'get' on the deleted notebook raises and error
231 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
232 self.assertRaises(HTTPError, nm.get_notebook_model, name, path)
232
233
233 def test_copy_notebook(self):
234 def test_copy_notebook(self):
234 with TemporaryDirectory() as td:
235 with TemporaryDirectory() as td:
235 # Test in the root directory
236 # Test in the root directory
236 # Create a notebook
237 # Create a notebook
237 nm = FileNotebookManager(notebook_dir=td)
238 nm = FileNotebookManager(notebook_dir=td)
238 path = u'Γ₯ b'
239 path = u'Γ₯ b'
239 name = u'nb √.ipynb'
240 name = u'nb √.ipynb'
240 os.mkdir(os.path.join(td, path))
241 os.mkdir(os.path.join(td, path))
241 orig = nm.create_notebook_model({'name' : name}, path=path)
242 orig = nm.create_notebook_model({'name' : name}, path=path)
242
243
243 # copy with unspecified name
244 # copy with unspecified name
244 copy = nm.copy_notebook(name, path=path)
245 copy = nm.copy_notebook(name, path=path)
245 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
246 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
246
247
247 # copy with specified name
248 # copy with specified name
248 copy2 = nm.copy_notebook(name, u'copy 2.ipynb', path=path)
249 copy2 = nm.copy_notebook(name, u'copy 2.ipynb', path=path)
249 self.assertEqual(copy2['name'], u'copy 2.ipynb')
250 self.assertEqual(copy2['name'], u'copy 2.ipynb')
250
251
@@ -1,323 +1,329 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the notebooks webservice API."""
2 """Test the notebooks webservice API."""
3
3
4 import io
4 import io
5 import json
5 import json
6 import os
6 import os
7 import shutil
7 import shutil
8 from unicodedata import normalize
8 from unicodedata import normalize
9
9
10 pjoin = os.path.join
10 pjoin = os.path.join
11
11
12 import requests
12 import requests
13
13
14 from IPython.html.utils import url_path_join, url_escape
14 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
15 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.nbformat import current
16 from IPython.nbformat import current
17 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
17 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
18 new_heading_cell, to_notebook_json)
18 new_heading_cell, to_notebook_json)
19 from IPython.nbformat import v2
19 from IPython.nbformat import v2
20 from IPython.utils import py3compat
20 from IPython.utils import py3compat
21 from IPython.utils.data import uniq_stable
21 from IPython.utils.data import uniq_stable
22
22
23
23
24 # TODO: Remove this after we create the contents web service and directories are
25 # no longer listed by the notebook web service.
26 def notebooks_only(nb_list):
27 return [nb for nb in nb_list if 'type' not in nb]
28
29
24 class NBAPI(object):
30 class NBAPI(object):
25 """Wrapper for notebook API calls."""
31 """Wrapper for notebook API calls."""
26 def __init__(self, base_url):
32 def __init__(self, base_url):
27 self.base_url = base_url
33 self.base_url = base_url
28
34
29 def _req(self, verb, path, body=None):
35 def _req(self, verb, path, body=None):
30 response = requests.request(verb,
36 response = requests.request(verb,
31 url_path_join(self.base_url, 'api/notebooks', path),
37 url_path_join(self.base_url, 'api/notebooks', path),
32 data=body,
38 data=body,
33 )
39 )
34 response.raise_for_status()
40 response.raise_for_status()
35 return response
41 return response
36
42
37 def list(self, path='/'):
43 def list(self, path='/'):
38 return self._req('GET', path)
44 return self._req('GET', path)
39
45
40 def read(self, name, path='/'):
46 def read(self, name, path='/'):
41 return self._req('GET', url_path_join(path, name))
47 return self._req('GET', url_path_join(path, name))
42
48
43 def create_untitled(self, path='/'):
49 def create_untitled(self, path='/'):
44 return self._req('POST', path)
50 return self._req('POST', path)
45
51
46 def upload_untitled(self, body, path='/'):
52 def upload_untitled(self, body, path='/'):
47 return self._req('POST', path, body)
53 return self._req('POST', path, body)
48
54
49 def copy_untitled(self, copy_from, path='/'):
55 def copy_untitled(self, copy_from, path='/'):
50 body = json.dumps({'copy_from':copy_from})
56 body = json.dumps({'copy_from':copy_from})
51 return self._req('POST', path, body)
57 return self._req('POST', path, body)
52
58
53 def create(self, name, path='/'):
59 def create(self, name, path='/'):
54 return self._req('PUT', url_path_join(path, name))
60 return self._req('PUT', url_path_join(path, name))
55
61
56 def upload(self, name, body, path='/'):
62 def upload(self, name, body, path='/'):
57 return self._req('PUT', url_path_join(path, name), body)
63 return self._req('PUT', url_path_join(path, name), body)
58
64
59 def copy(self, copy_from, copy_to, path='/'):
65 def copy(self, copy_from, copy_to, path='/'):
60 body = json.dumps({'copy_from':copy_from})
66 body = json.dumps({'copy_from':copy_from})
61 return self._req('PUT', url_path_join(path, copy_to), body)
67 return self._req('PUT', url_path_join(path, copy_to), body)
62
68
63 def save(self, name, body, path='/'):
69 def save(self, name, body, path='/'):
64 return self._req('PUT', url_path_join(path, name), body)
70 return self._req('PUT', url_path_join(path, name), body)
65
71
66 def delete(self, name, path='/'):
72 def delete(self, name, path='/'):
67 return self._req('DELETE', url_path_join(path, name))
73 return self._req('DELETE', url_path_join(path, name))
68
74
69 def rename(self, name, path, new_name):
75 def rename(self, name, path, new_name):
70 body = json.dumps({'name': new_name})
76 body = json.dumps({'name': new_name})
71 return self._req('PATCH', url_path_join(path, name), body)
77 return self._req('PATCH', url_path_join(path, name), body)
72
78
73 def get_checkpoints(self, name, path):
79 def get_checkpoints(self, name, path):
74 return self._req('GET', url_path_join(path, name, 'checkpoints'))
80 return self._req('GET', url_path_join(path, name, 'checkpoints'))
75
81
76 def new_checkpoint(self, name, path):
82 def new_checkpoint(self, name, path):
77 return self._req('POST', url_path_join(path, name, 'checkpoints'))
83 return self._req('POST', url_path_join(path, name, 'checkpoints'))
78
84
79 def restore_checkpoint(self, name, path, checkpoint_id):
85 def restore_checkpoint(self, name, path, checkpoint_id):
80 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
86 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
81
87
82 def delete_checkpoint(self, name, path, checkpoint_id):
88 def delete_checkpoint(self, name, path, checkpoint_id):
83 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
89 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
84
90
85 class APITest(NotebookTestBase):
91 class APITest(NotebookTestBase):
86 """Test the kernels web service API"""
92 """Test the kernels web service API"""
87 dirs_nbs = [('', 'inroot'),
93 dirs_nbs = [('', 'inroot'),
88 ('Directory with spaces in', 'inspace'),
94 ('Directory with spaces in', 'inspace'),
89 (u'unicodΓ©', 'innonascii'),
95 (u'unicodΓ©', 'innonascii'),
90 ('foo', 'a'),
96 ('foo', 'a'),
91 ('foo', 'b'),
97 ('foo', 'b'),
92 ('foo', 'name with spaces'),
98 ('foo', 'name with spaces'),
93 ('foo', u'unicodΓ©'),
99 ('foo', u'unicodΓ©'),
94 ('foo/bar', 'baz'),
100 ('foo/bar', 'baz'),
95 (u'Γ₯ b', u'Γ§ d')
101 (u'Γ₯ b', u'Γ§ d')
96 ]
102 ]
97
103
98 dirs = uniq_stable([d for (d,n) in dirs_nbs])
104 dirs = uniq_stable([d for (d,n) in dirs_nbs])
99 del dirs[0] # remove ''
105 del dirs[0] # remove ''
100
106
101 def setUp(self):
107 def setUp(self):
102 nbdir = self.notebook_dir.name
108 nbdir = self.notebook_dir.name
103
109
104 for d in self.dirs:
110 for d in self.dirs:
105 d.replace('/', os.sep)
111 d.replace('/', os.sep)
106 if not os.path.isdir(pjoin(nbdir, d)):
112 if not os.path.isdir(pjoin(nbdir, d)):
107 os.mkdir(pjoin(nbdir, d))
113 os.mkdir(pjoin(nbdir, d))
108
114
109 for d, name in self.dirs_nbs:
115 for d, name in self.dirs_nbs:
110 d = d.replace('/', os.sep)
116 d = d.replace('/', os.sep)
111 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
117 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
112 encoding='utf-8') as f:
118 encoding='utf-8') as f:
113 nb = new_notebook(name=name)
119 nb = new_notebook(name=name)
114 write(nb, f, format='ipynb')
120 write(nb, f, format='ipynb')
115
121
116 self.nb_api = NBAPI(self.base_url())
122 self.nb_api = NBAPI(self.base_url())
117
123
118 def tearDown(self):
124 def tearDown(self):
119 nbdir = self.notebook_dir.name
125 nbdir = self.notebook_dir.name
120
126
121 for dname in ['foo', 'Directory with spaces in', u'unicodΓ©', u'Γ₯ b']:
127 for dname in ['foo', 'Directory with spaces in', u'unicodΓ©', u'Γ₯ b']:
122 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
128 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
123
129
124 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
130 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
125 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
131 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
126
132
127 def test_list_notebooks(self):
133 def test_list_notebooks(self):
128 nbs = self.nb_api.list().json()
134 nbs = notebooks_only(self.nb_api.list().json())
129 self.assertEqual(len(nbs), 1)
135 self.assertEqual(len(nbs), 1)
130 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
136 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
131
137
132 nbs = self.nb_api.list('/Directory with spaces in/').json()
138 nbs = notebooks_only(self.nb_api.list('/Directory with spaces in/').json())
133 self.assertEqual(len(nbs), 1)
139 self.assertEqual(len(nbs), 1)
134 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
140 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
135
141
136 nbs = self.nb_api.list(u'/unicodΓ©/').json()
142 nbs = notebooks_only(self.nb_api.list(u'/unicodΓ©/').json())
137 self.assertEqual(len(nbs), 1)
143 self.assertEqual(len(nbs), 1)
138 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
144 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
139 self.assertEqual(nbs[0]['path'], u'unicodΓ©')
145 self.assertEqual(nbs[0]['path'], u'unicodΓ©')
140
146
141 nbs = self.nb_api.list('/foo/bar/').json()
147 nbs = notebooks_only(self.nb_api.list('/foo/bar/').json())
142 self.assertEqual(len(nbs), 1)
148 self.assertEqual(len(nbs), 1)
143 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
149 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
144 self.assertEqual(nbs[0]['path'], 'foo/bar')
150 self.assertEqual(nbs[0]['path'], 'foo/bar')
145
151
146 nbs = self.nb_api.list('foo').json()
152 nbs = notebooks_only(self.nb_api.list('foo').json())
147 self.assertEqual(len(nbs), 4)
153 self.assertEqual(len(nbs), 4)
148 nbnames = { normalize('NFC', n['name']) for n in nbs }
154 nbnames = { normalize('NFC', n['name']) for n in nbs }
149 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
155 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
150 expected = { normalize('NFC', name) for name in expected }
156 expected = { normalize('NFC', name) for name in expected }
151 self.assertEqual(nbnames, expected)
157 self.assertEqual(nbnames, expected)
152
158
153 def test_list_nonexistant_dir(self):
159 def test_list_nonexistant_dir(self):
154 with assert_http_error(404):
160 with assert_http_error(404):
155 self.nb_api.list('nonexistant')
161 self.nb_api.list('nonexistant')
156
162
157 def test_get_contents(self):
163 def test_get_contents(self):
158 for d, name in self.dirs_nbs:
164 for d, name in self.dirs_nbs:
159 nb = self.nb_api.read('%s.ipynb' % name, d+'/').json()
165 nb = self.nb_api.read('%s.ipynb' % name, d+'/').json()
160 self.assertEqual(nb['name'], u'%s.ipynb' % name)
166 self.assertEqual(nb['name'], u'%s.ipynb' % name)
161 self.assertIn('content', nb)
167 self.assertIn('content', nb)
162 self.assertIn('metadata', nb['content'])
168 self.assertIn('metadata', nb['content'])
163 self.assertIsInstance(nb['content']['metadata'], dict)
169 self.assertIsInstance(nb['content']['metadata'], dict)
164
170
165 # Name that doesn't exist - should be a 404
171 # Name that doesn't exist - should be a 404
166 with assert_http_error(404):
172 with assert_http_error(404):
167 self.nb_api.read('q.ipynb', 'foo')
173 self.nb_api.read('q.ipynb', 'foo')
168
174
169 def _check_nb_created(self, resp, name, path):
175 def _check_nb_created(self, resp, name, path):
170 self.assertEqual(resp.status_code, 201)
176 self.assertEqual(resp.status_code, 201)
171 location_header = py3compat.str_to_unicode(resp.headers['Location'])
177 location_header = py3compat.str_to_unicode(resp.headers['Location'])
172 self.assertEqual(location_header, url_escape(url_path_join(u'/api/notebooks', path, name)))
178 self.assertEqual(location_header, url_escape(url_path_join(u'/api/notebooks', path, name)))
173 self.assertEqual(resp.json()['name'], name)
179 self.assertEqual(resp.json()['name'], name)
174 assert os.path.isfile(pjoin(
180 assert os.path.isfile(pjoin(
175 self.notebook_dir.name,
181 self.notebook_dir.name,
176 path.replace('/', os.sep),
182 path.replace('/', os.sep),
177 name,
183 name,
178 ))
184 ))
179
185
180 def test_create_untitled(self):
186 def test_create_untitled(self):
181 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
187 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
182 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
188 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
183
189
184 # Second time
190 # Second time
185 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
191 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
186 self._check_nb_created(resp, 'Untitled1.ipynb', u'Γ₯ b')
192 self._check_nb_created(resp, 'Untitled1.ipynb', u'Γ₯ b')
187
193
188 # And two directories down
194 # And two directories down
189 resp = self.nb_api.create_untitled(path='foo/bar')
195 resp = self.nb_api.create_untitled(path='foo/bar')
190 self._check_nb_created(resp, 'Untitled0.ipynb', 'foo/bar')
196 self._check_nb_created(resp, 'Untitled0.ipynb', 'foo/bar')
191
197
192 def test_upload_untitled(self):
198 def test_upload_untitled(self):
193 nb = new_notebook(name='Upload test')
199 nb = new_notebook(name='Upload test')
194 nbmodel = {'content': nb}
200 nbmodel = {'content': nb}
195 resp = self.nb_api.upload_untitled(path=u'Γ₯ b',
201 resp = self.nb_api.upload_untitled(path=u'Γ₯ b',
196 body=json.dumps(nbmodel))
202 body=json.dumps(nbmodel))
197 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
203 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
198
204
199 def test_upload(self):
205 def test_upload(self):
200 nb = new_notebook(name=u'ignored')
206 nb = new_notebook(name=u'ignored')
201 nbmodel = {'content': nb}
207 nbmodel = {'content': nb}
202 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
208 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
203 body=json.dumps(nbmodel))
209 body=json.dumps(nbmodel))
204 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
210 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
205
211
206 def test_upload_v2(self):
212 def test_upload_v2(self):
207 nb = v2.new_notebook()
213 nb = v2.new_notebook()
208 ws = v2.new_worksheet()
214 ws = v2.new_worksheet()
209 nb.worksheets.append(ws)
215 nb.worksheets.append(ws)
210 ws.cells.append(v2.new_code_cell(input='print("hi")'))
216 ws.cells.append(v2.new_code_cell(input='print("hi")'))
211 nbmodel = {'content': nb}
217 nbmodel = {'content': nb}
212 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
218 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
213 body=json.dumps(nbmodel))
219 body=json.dumps(nbmodel))
214 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
220 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
215 resp = self.nb_api.read(u'Upload tΓ©st.ipynb', u'Γ₯ b')
221 resp = self.nb_api.read(u'Upload tΓ©st.ipynb', u'Γ₯ b')
216 data = resp.json()
222 data = resp.json()
217 self.assertEqual(data['content']['nbformat'], current.nbformat)
223 self.assertEqual(data['content']['nbformat'], current.nbformat)
218 self.assertEqual(data['content']['orig_nbformat'], 2)
224 self.assertEqual(data['content']['orig_nbformat'], 2)
219
225
220 def test_copy_untitled(self):
226 def test_copy_untitled(self):
221 resp = self.nb_api.copy_untitled(u'Γ§ d.ipynb', path=u'Γ₯ b')
227 resp = self.nb_api.copy_untitled(u'Γ§ d.ipynb', path=u'Γ₯ b')
222 self._check_nb_created(resp, u'Γ§ d-Copy0.ipynb', u'Γ₯ b')
228 self._check_nb_created(resp, u'Γ§ d-Copy0.ipynb', u'Γ₯ b')
223
229
224 def test_copy(self):
230 def test_copy(self):
225 resp = self.nb_api.copy(u'Γ§ d.ipynb', u'cΓΈpy.ipynb', path=u'Γ₯ b')
231 resp = self.nb_api.copy(u'Γ§ d.ipynb', u'cΓΈpy.ipynb', path=u'Γ₯ b')
226 self._check_nb_created(resp, u'cΓΈpy.ipynb', u'Γ₯ b')
232 self._check_nb_created(resp, u'cΓΈpy.ipynb', u'Γ₯ b')
227
233
228 def test_delete(self):
234 def test_delete(self):
229 for d, name in self.dirs_nbs:
235 for d, name in self.dirs_nbs:
230 resp = self.nb_api.delete('%s.ipynb' % name, d)
236 resp = self.nb_api.delete('%s.ipynb' % name, d)
231 self.assertEqual(resp.status_code, 204)
237 self.assertEqual(resp.status_code, 204)
232
238
233 for d in self.dirs + ['/']:
239 for d in self.dirs + ['/']:
234 nbs = self.nb_api.list(d).json()
240 nbs = notebooks_only(self.nb_api.list(d).json())
235 self.assertEqual(len(nbs), 0)
241 self.assertEqual(len(nbs), 0)
236
242
237 def test_rename(self):
243 def test_rename(self):
238 resp = self.nb_api.rename('a.ipynb', 'foo', 'z.ipynb')
244 resp = self.nb_api.rename('a.ipynb', 'foo', 'z.ipynb')
239 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
245 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
240 self.assertEqual(resp.json()['name'], 'z.ipynb')
246 self.assertEqual(resp.json()['name'], 'z.ipynb')
241 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
247 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
242
248
243 nbs = self.nb_api.list('foo').json()
249 nbs = notebooks_only(self.nb_api.list('foo').json())
244 nbnames = set(n['name'] for n in nbs)
250 nbnames = set(n['name'] for n in nbs)
245 self.assertIn('z.ipynb', nbnames)
251 self.assertIn('z.ipynb', nbnames)
246 self.assertNotIn('a.ipynb', nbnames)
252 self.assertNotIn('a.ipynb', nbnames)
247
253
248 def test_rename_existing(self):
254 def test_rename_existing(self):
249 with assert_http_error(409):
255 with assert_http_error(409):
250 self.nb_api.rename('a.ipynb', 'foo', 'b.ipynb')
256 self.nb_api.rename('a.ipynb', 'foo', 'b.ipynb')
251
257
252 def test_save(self):
258 def test_save(self):
253 resp = self.nb_api.read('a.ipynb', 'foo')
259 resp = self.nb_api.read('a.ipynb', 'foo')
254 nbcontent = json.loads(resp.text)['content']
260 nbcontent = json.loads(resp.text)['content']
255 nb = to_notebook_json(nbcontent)
261 nb = to_notebook_json(nbcontent)
256 ws = new_worksheet()
262 ws = new_worksheet()
257 nb.worksheets = [ws]
263 nb.worksheets = [ws]
258 ws.cells.append(new_heading_cell(u'Created by test Β³'))
264 ws.cells.append(new_heading_cell(u'Created by test Β³'))
259
265
260 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
266 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
261 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
267 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
262
268
263 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
269 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
264 with io.open(nbfile, 'r', encoding='utf-8') as f:
270 with io.open(nbfile, 'r', encoding='utf-8') as f:
265 newnb = read(f, format='ipynb')
271 newnb = read(f, format='ipynb')
266 self.assertEqual(newnb.worksheets[0].cells[0].source,
272 self.assertEqual(newnb.worksheets[0].cells[0].source,
267 u'Created by test Β³')
273 u'Created by test Β³')
268 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
274 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
269 newnb = to_notebook_json(nbcontent)
275 newnb = to_notebook_json(nbcontent)
270 self.assertEqual(newnb.worksheets[0].cells[0].source,
276 self.assertEqual(newnb.worksheets[0].cells[0].source,
271 u'Created by test Β³')
277 u'Created by test Β³')
272
278
273 # Save and rename
279 # Save and rename
274 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb}
280 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb}
275 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
281 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
276 saved = resp.json()
282 saved = resp.json()
277 self.assertEqual(saved['name'], 'a2.ipynb')
283 self.assertEqual(saved['name'], 'a2.ipynb')
278 self.assertEqual(saved['path'], 'foo/bar')
284 self.assertEqual(saved['path'], 'foo/bar')
279 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
285 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
280 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
286 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
281 with assert_http_error(404):
287 with assert_http_error(404):
282 self.nb_api.read('a.ipynb', 'foo')
288 self.nb_api.read('a.ipynb', 'foo')
283
289
284 def test_checkpoints(self):
290 def test_checkpoints(self):
285 resp = self.nb_api.read('a.ipynb', 'foo')
291 resp = self.nb_api.read('a.ipynb', 'foo')
286 r = self.nb_api.new_checkpoint('a.ipynb', 'foo')
292 r = self.nb_api.new_checkpoint('a.ipynb', 'foo')
287 self.assertEqual(r.status_code, 201)
293 self.assertEqual(r.status_code, 201)
288 cp1 = r.json()
294 cp1 = r.json()
289 self.assertEqual(set(cp1), {'id', 'last_modified'})
295 self.assertEqual(set(cp1), {'id', 'last_modified'})
290 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
296 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
291
297
292 # Modify it
298 # Modify it
293 nbcontent = json.loads(resp.text)['content']
299 nbcontent = json.loads(resp.text)['content']
294 nb = to_notebook_json(nbcontent)
300 nb = to_notebook_json(nbcontent)
295 ws = new_worksheet()
301 ws = new_worksheet()
296 nb.worksheets = [ws]
302 nb.worksheets = [ws]
297 hcell = new_heading_cell('Created by test')
303 hcell = new_heading_cell('Created by test')
298 ws.cells.append(hcell)
304 ws.cells.append(hcell)
299 # Save
305 # Save
300 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
306 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
301 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
307 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
302
308
303 # List checkpoints
309 # List checkpoints
304 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
310 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
305 self.assertEqual(cps, [cp1])
311 self.assertEqual(cps, [cp1])
306
312
307 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
313 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
308 nb = to_notebook_json(nbcontent)
314 nb = to_notebook_json(nbcontent)
309 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
315 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
310
316
311 # Restore cp1
317 # Restore cp1
312 r = self.nb_api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
318 r = self.nb_api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
313 self.assertEqual(r.status_code, 204)
319 self.assertEqual(r.status_code, 204)
314 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
320 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
315 nb = to_notebook_json(nbcontent)
321 nb = to_notebook_json(nbcontent)
316 self.assertEqual(nb.worksheets, [])
322 self.assertEqual(nb.worksheets, [])
317
323
318 # Delete cp1
324 # Delete cp1
319 r = self.nb_api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
325 r = self.nb_api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
320 self.assertEqual(r.status_code, 204)
326 self.assertEqual(r.status_code, 204)
321 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
327 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
322 self.assertEqual(cps, [])
328 self.assertEqual(cps, [])
323
329
General Comments 0
You need to be logged in to leave comments. Login now