##// END OF EJS Templates
clean up of get_os_path and its tests...
Paul Ivanov -
Show More
@@ -1,372 +1,373
1 1 """A notebook manager that uses the local file system for storage.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import datetime
20 20 import io
21 21 import os
22 22 import glob
23 23 import shutil
24 24
25 25 from unicodedata import normalize
26 26
27 27 from tornado import web
28 28
29 29 from .nbmanager import NotebookManager
30 30 from IPython.nbformat import current
31 31 from IPython.utils.traitlets import Unicode, Dict, Bool, TraitError
32 32 from IPython.utils import tz
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Classes
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class FileNotebookManager(NotebookManager):
39 39
40 40 save_script = Bool(False, config=True,
41 41 help="""Automatically create a Python script when saving the notebook.
42 42
43 43 For easier use of import, %run and %load across notebooks, a
44 44 <notebook-name>.py script will be created next to any
45 45 <notebook-name>.ipynb on each save. This can also be set with the
46 46 short `--script` flag.
47 47 """
48 48 )
49 49
50 50 checkpoint_dir = Unicode(config=True,
51 51 help="""The location in which to keep notebook checkpoints
52 52
53 53 By default, it is notebook-dir/.ipynb_checkpoints
54 54 """
55 55 )
56 56 def _checkpoint_dir_default(self):
57 57 return os.path.join(self.notebook_dir, '.ipynb_checkpoints')
58 58
59 59 def _checkpoint_dir_changed(self, name, old, new):
60 60 """do a bit of validation of the checkpoint dir"""
61 61 if not os.path.isabs(new):
62 62 # If we receive a non-absolute path, make it absolute.
63 63 abs_new = os.path.abspath(new)
64 64 self.checkpoint_dir = abs_new
65 65 return
66 66 if os.path.exists(new) and not os.path.isdir(new):
67 67 raise TraitError("checkpoint dir %r is not a directory" % new)
68 68 if not os.path.exists(new):
69 69 self.log.info("Creating checkpoint dir %s", new)
70 70 try:
71 71 os.mkdir(new)
72 72 except:
73 73 raise TraitError("Couldn't create checkpoint dir %r" % new)
74 74
75 75 filename_ext = Unicode(u'.ipynb')
76 76
77 77
78 78 def get_notebook_names(self, path):
79 79 """List all notebook names in the notebook dir."""
80 80 names = glob.glob(self.get_os_path('*'+self.filename_ext, path))
81 81 names = [os.path.basename(name)
82 82 for name in names]
83 83 return names
84 84
85 85 def list_notebooks(self, path):
86 86 """List all notebooks in the notebook dir."""
87 87 notebook_names = self.get_notebook_names(path)
88 88 notebooks = []
89 89 for name in notebook_names:
90 90 model = self.notebook_model(name, path, content=False)
91 91 notebooks.append(model)
92 92 return notebooks
93 93
94 94 def change_notebook(self, data, notebook_name, notebook_path=None):
95 95 """Changes notebook"""
96 96 changes = data.keys()
97 97 response = 200
98 98 for change in changes:
99 99 full_path = self.get_os_path(notebook_name, notebook_path)
100 100 if change == "name":
101 101 new_path = self.get_os_path(data['name'], notebook_path)
102 102 if not os.path.isfile(new_path):
103 103 os.rename(full_path,
104 104 self.get_os_path(data['name'], notebook_path))
105 105 notebook_name = data['name']
106 106 else:
107 107 response = 409
108 108 if change == "path":
109 109 new_path = self.get_os_path(data['name'], data['path'])
110 110 stutil.move(full_path, new_path)
111 111 notebook_path = data['path']
112 112 if change == "content":
113 113 self.save_notebook(data, notebook_name, notebook_path)
114 114 model = self.notebook_model(notebook_name, notebook_path)
115 115 return model, response
116 116
117 117 def notebook_exists(self, name, path):
118 118 """Returns a True if the notebook exists. Else, returns False.
119 119
120 120 Parameters
121 121 ----------
122 122 name : string
123 123 The name of the notebook you are checking.
124 124 path : string
125 125 The relative path to the notebook (with '/' as separator)
126 126
127 127 Returns
128 128 -------
129 129 bool
130 130 """
131 131 path = self.get_os_path(name, path)
132 132 return os.path.isfile(path)
133 133
134 def get_os_path(self, fname, path=None):
135 """Return a full path to a notebook with the os.sep as the separator.
134 def get_os_path(self, fname, path='/'):
135 """Given a notebook name and a server URL path, return its file system
136 path.
136 137
137 138 Parameters
138 139 ----------
139 140 fname : string
140 141 The name of a notebook file with the .ipynb extension
141 142 path : string
142 The relative path (with '/' as separator) to the named notebook.
143 The relative URL path (with '/' as separator) to the named
144 notebook.
143 145
144 146 Returns
145 147 -------
146 path :
147 A path that combines notebook_dir (location where server started),
148 the relative path, and the filename with the current operating
149 system's url.
148 path : string
149 A file system path that combines notebook_dir (location where
150 server started), the relative path, and the filename with the
151 current operating system's url.
150 152 """
151 if path is None:
152 path = '/'
153 153 parts = path.split('/')
154 154 parts = [p for p in parts if p != ''] # remove duplicate splits
155 path = os.sep.join([self.notebook_dir] + parts + [fname])
155 parts += [fname]
156 path = os.path.join(self.notebook_dir, *parts)
156 157 return path
157 158
158 159 def read_notebook_object_from_path(self, path):
159 160 """read a notebook object from a path"""
160 161 info = os.stat(path)
161 162 last_modified = tz.utcfromtimestamp(info.st_mtime)
162 163 with open(path,'r') as f:
163 164 s = f.read()
164 165 try:
165 166 # v1 and v2 and json in the .ipynb files.
166 167 nb = current.reads(s, u'json')
167 168 except ValueError as e:
168 169 msg = u"Unreadable Notebook: %s" % e
169 170 raise web.HTTPError(400, msg, reason=msg)
170 171 return last_modified, nb
171 172
172 173 def read_notebook_object(self, notebook_name, notebook_path=None):
173 174 """Get the Notebook representation of a notebook by notebook_name."""
174 175 path = self.get_os_path(notebook_name, notebook_path)
175 176 if not os.path.isfile(path):
176 177 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_name)
177 178 last_modified, nb = self.read_notebook_object_from_path(path)
178 179 # Always use the filename as the notebook name.
179 180 # Eventually we will get rid of the notebook name in the metadata
180 181 # but for now, that name is just an empty string. Until the notebooks
181 182 # web service knows about names in URLs we still pass the name
182 183 # back to the web app using the metadata though.
183 184 nb.metadata.name = os.path.splitext(os.path.basename(path))[0]
184 185 return last_modified, nb
185 186
186 187 def write_notebook_object(self, nb, notebook_name=None, notebook_path=None, new_name= None):
187 188 """Save an existing notebook object by notebook_name."""
188 189 if new_name == None:
189 190 try:
190 191 new_name = normalize('NFC', nb.metadata.name)
191 192 except AttributeError:
192 193 raise web.HTTPError(400, u'Missing notebook name')
193 194
194 195 new_path = notebook_path
195 196 old_name = notebook_name
196 197 old_checkpoints = self.list_checkpoints(old_name)
197 198
198 199 path = self.get_os_path(new_name, new_path)
199 200
200 201 # Right before we save the notebook, we write an empty string as the
201 202 # notebook name in the metadata. This is to prepare for removing
202 203 # this attribute entirely post 1.0. The web app still uses the metadata
203 204 # name for now.
204 205 nb.metadata.name = u''
205 206
206 207 try:
207 208 self.log.debug("Autosaving notebook %s", path)
208 209 with open(path,'w') as f:
209 210 current.write(nb, f, u'json')
210 211 except Exception as e:
211 212 raise web.HTTPError(400, u'Unexpected error while autosaving notebook: %s' % e)
212 213
213 214 # save .py script as well
214 215 if self.save_script:
215 216 pypath = os.path.splitext(path)[0] + '.py'
216 217 self.log.debug("Writing script %s", pypath)
217 218 try:
218 219 with io.open(pypath,'w', encoding='utf-8') as f:
219 220 current.write(nb, f, u'py')
220 221 except Exception as e:
221 222 raise web.HTTPError(400, u'Unexpected error while saving notebook as script: %s' % e)
222 223
223 224 if old_name != None:
224 225 # remove old files if the name changed
225 226 if old_name != new_name:
226 227 # remove renamed original, if it exists
227 228 old_path = self.get_os_path(old_name, notebook_path)
228 229 if os.path.isfile(old_path):
229 230 self.log.debug("unlinking notebook %s", old_path)
230 231 os.unlink(old_path)
231 232
232 233 # cleanup old script, if it exists
233 234 if self.save_script:
234 235 old_pypath = os.path.splitext(old_path)[0] + '.py'
235 236 if os.path.isfile(old_pypath):
236 237 self.log.debug("unlinking script %s", old_pypath)
237 238 os.unlink(old_pypath)
238 239
239 240 # rename checkpoints to follow file
240 241 for cp in old_checkpoints:
241 242 checkpoint_id = cp['checkpoint_id']
242 243 old_cp_path = self.get_checkpoint_path_by_name(old_name, checkpoint_id)
243 244 new_cp_path = self.get_checkpoint_path_by_name(new_name, checkpoint_id)
244 245 if os.path.isfile(old_cp_path):
245 246 self.log.debug("renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
246 247 os.rename(old_cp_path, new_cp_path)
247 248
248 249 return new_name
249 250
250 251 def delete_notebook(self, notebook_name, notebook_path):
251 252 """Delete notebook by notebook_name."""
252 253 nb_path = self.get_os_path(notebook_name, notebook_path)
253 254 if not os.path.isfile(nb_path):
254 255 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_name)
255 256
256 257 # clear checkpoints
257 258 for checkpoint in self.list_checkpoints(notebook_name):
258 259 checkpoint_id = checkpoint['checkpoint_id']
259 260 path = self.get_checkpoint_path(notebook_name, checkpoint_id)
260 261 self.log.debug(path)
261 262 if os.path.isfile(path):
262 263 self.log.debug("unlinking checkpoint %s", path)
263 264 os.unlink(path)
264 265
265 266 self.log.debug("unlinking notebook %s", nb_path)
266 267 os.unlink(nb_path)
267 268
268 269 def increment_filename(self, basename, notebook_path=None):
269 270 """Return a non-used filename of the form basename<int>.
270 271
271 272 This searches through the filenames (basename0, basename1, ...)
272 273 until is find one that is not already being used. It is used to
273 274 create Untitled and Copy names that are unique.
274 275 """
275 276 i = 0
276 277 while True:
277 278 name = u'%s%i.ipynb' % (basename,i)
278 279 path = self.get_os_path(name, notebook_path)
279 280 if not os.path.isfile(path):
280 281 break
281 282 else:
282 283 i = i+1
283 284 return name
284 285
285 286 # Checkpoint-related utilities
286 287
287 288 def get_checkpoint_path_by_name(self, name, checkpoint_id, notebook_path=None):
288 289 """Return a full path to a notebook checkpoint, given its name and checkpoint id."""
289 290 filename = u"{name}-{checkpoint_id}{ext}".format(
290 291 name=name,
291 292 checkpoint_id=checkpoint_id,
292 293 ext=self.filename_ext,
293 294 )
294 295 if notebook_path ==None:
295 296 path = os.path.join(self.checkpoint_dir, filename)
296 297 else:
297 298 path = os.path.join(notebook_path, self.checkpoint_dir, filename)
298 299 return path
299 300
300 301 def get_checkpoint_path(self, notebook_name, checkpoint_id, notebook_path=None):
301 302 """find the path to a checkpoint"""
302 303 name = notebook_name
303 304 return self.get_checkpoint_path_by_name(name, checkpoint_id, notebook_path)
304 305
305 306 def get_checkpoint_info(self, notebook_name, checkpoint_id, notebook_path=None):
306 307 """construct the info dict for a given checkpoint"""
307 308 path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
308 309 stats = os.stat(path)
309 310 last_modified = tz.utcfromtimestamp(stats.st_mtime)
310 311 info = dict(
311 312 checkpoint_id = checkpoint_id,
312 313 last_modified = last_modified,
313 314 )
314 315
315 316 return info
316 317
317 318 # public checkpoint API
318 319
319 320 def create_checkpoint(self, notebook_name, notebook_path=None):
320 321 """Create a checkpoint from the current state of a notebook"""
321 322 nb_path = self.get_os_path(notebook_name, notebook_path)
322 323 # only the one checkpoint ID:
323 324 checkpoint_id = u"checkpoint"
324 325 cp_path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
325 326 self.log.debug("creating checkpoint for notebook %s", notebook_name)
326 327 if not os.path.exists(self.checkpoint_dir):
327 328 os.mkdir(self.checkpoint_dir)
328 329 shutil.copy2(nb_path, cp_path)
329 330
330 331 # return the checkpoint info
331 332 return self.get_checkpoint_info(notebook_name, checkpoint_id, notebook_path)
332 333
333 334 def list_checkpoints(self, notebook_name, notebook_path=None):
334 335 """list the checkpoints for a given notebook
335 336
336 337 This notebook manager currently only supports one checkpoint per notebook.
337 338 """
338 339 checkpoint_id = "checkpoint"
339 340 path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
340 341 if not os.path.exists(path):
341 342 return []
342 343 else:
343 344 return [self.get_checkpoint_info(notebook_name, checkpoint_id, notebook_path)]
344 345
345 346
346 347 def restore_checkpoint(self, notebook_name, checkpoint_id, notebook_path=None):
347 348 """restore a notebook to a checkpointed state"""
348 349 self.log.info("restoring Notebook %s from checkpoint %s", notebook_name, checkpoint_id)
349 350 nb_path = self.get_os_path(notebook_name, notebook_path)
350 351 cp_path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
351 352 if not os.path.isfile(cp_path):
352 353 self.log.debug("checkpoint file does not exist: %s", cp_path)
353 354 raise web.HTTPError(404,
354 355 u'Notebook checkpoint does not exist: %s-%s' % (notebook_name, checkpoint_id)
355 356 )
356 357 # ensure notebook is readable (never restore from an unreadable notebook)
357 358 last_modified, nb = self.read_notebook_object_from_path(cp_path)
358 359 shutil.copy2(cp_path, nb_path)
359 360 self.log.debug("copying %s -> %s", cp_path, nb_path)
360 361
361 362 def delete_checkpoint(self, notebook_name, checkpoint_id, notebook_path=None):
362 363 """delete a notebook's checkpoint"""
363 364 path = self.get_checkpoint_path(notebook_name, checkpoint_id, notebook_path)
364 365 if not os.path.isfile(path):
365 366 raise web.HTTPError(404,
366 367 u'Notebook checkpoint does not exist: %s-%s' % (notebook_name, checkpoint_id)
367 368 )
368 369 self.log.debug("unlinking %s", path)
369 370 os.unlink(path)
370 371
371 372 def info_string(self):
372 373 return "Serving notebooks from local directory: %s" % self.notebook_dir
@@ -1,112 +1,111
1 1 """Tests for the notebook manager."""
2 2
3 3 import os
4 4 from unittest import TestCase
5 5 from tempfile import NamedTemporaryFile
6 6
7 7 from IPython.utils.tempdir import TemporaryDirectory
8 8 from IPython.utils.traitlets import TraitError
9 9
10 10 from ..filenbmanager import FileNotebookManager
11 11 from ..nbmanager import NotebookManager
12 12
13 13 class TestFileNotebookManager(TestCase):
14 14
15 15 def test_nb_dir(self):
16 16 with TemporaryDirectory() as td:
17 km = FileNotebookManager(notebook_dir=td)
18 self.assertEqual(km.notebook_dir, td)
17 fm = FileNotebookManager(notebook_dir=td)
18 self.assertEqual(fm.notebook_dir, td)
19 19
20 20 def test_create_nb_dir(self):
21 21 with TemporaryDirectory() as td:
22 22 nbdir = os.path.join(td, 'notebooks')
23 km = FileNotebookManager(notebook_dir=nbdir)
24 self.assertEqual(km.notebook_dir, nbdir)
23 fm = FileNotebookManager(notebook_dir=nbdir)
24 self.assertEqual(fm.notebook_dir, nbdir)
25 25
26 26 def test_missing_nb_dir(self):
27 27 with TemporaryDirectory() as td:
28 28 nbdir = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
29 29 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=nbdir)
30 30
31 31 def test_invalid_nb_dir(self):
32 32 with NamedTemporaryFile() as tf:
33 33 self.assertRaises(TraitError, FileNotebookManager, notebook_dir=tf.name)
34 34
35 35 def test_get_os_path(self):
36 36 # full filesystem path should be returned with correct operating system
37 37 # separators.
38 38 with TemporaryDirectory() as td:
39 39 nbdir = os.path.join(td, 'notebooks')
40 km = FileNotebookManager(notebook_dir=nbdir)
41 path = km.get_os_path('test.ipynb', '/path/to/notebook/')
42 self.assertEqual(path, km.notebook_dir+os.sep+'path'+os.sep+'to'+os.sep+
43 'notebook'+os.sep+'test.ipynb')
44
45 with TemporaryDirectory() as td:
46 nbdir = os.path.join(td, 'notebooks')
47 km = FileNotebookManager(notebook_dir=nbdir)
48 path = km.get_os_path('test.ipynb', None)
49 self.assertEqual(path, km.notebook_dir+os.sep+'test.ipynb')
50
51 with TemporaryDirectory() as td:
52 nbdir = os.path.join(td, 'notebooks')
53 km = FileNotebookManager(notebook_dir=nbdir)
54 path = km.get_os_path('test.ipynb', '////')
55 self.assertEqual(path, km.notebook_dir+os.sep+'test.ipynb')
40 fm = FileNotebookManager(notebook_dir=nbdir)
41 path = fm.get_os_path('test.ipynb', '/path/to/notebook/')
42 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
43 fs_path = os.path.join(fm.notebook_dir, *rel_path_list)
44 self.assertEqual(path, fs_path)
45
46 fm = FileNotebookManager(notebook_dir=nbdir)
47 path = fm.get_os_path('test.ipynb')
48 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
49 self.assertEqual(path, fs_path)
50
51 fm = FileNotebookManager(notebook_dir=nbdir)
52 path = fm.get_os_path('test.ipynb', '////')
53 fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
54 self.assertEqual(path, fs_path)
56 55
57 56 class TestNotebookManager(TestCase):
58 57 def test_named_notebook_path(self):
59 58 nm = NotebookManager()
60 59
61 60 # doesn't end with ipynb, should just be path
62 61 name, path = nm.named_notebook_path('hello')
63 62 self.assertEqual(name, None)
64 63 self.assertEqual(path, '/hello/')
65 64
66 65 name, path = nm.named_notebook_path('/')
67 66 self.assertEqual(name, None)
68 67 self.assertEqual(path, '/')
69 68
70 69 name, path = nm.named_notebook_path('hello.ipynb')
71 70 self.assertEqual(name, 'hello.ipynb')
72 71 self.assertEqual(path, '/')
73 72
74 73 name, path = nm.named_notebook_path('/hello.ipynb')
75 74 self.assertEqual(name, 'hello.ipynb')
76 75 self.assertEqual(path, '/')
77 76
78 77 name, path = nm.named_notebook_path('/this/is/a/path/hello.ipynb')
79 78 self.assertEqual(name, 'hello.ipynb')
80 79 self.assertEqual(path, '/this/is/a/path/')
81 80
82 81 name, path = nm.named_notebook_path('path/without/leading/slash/hello.ipynb')
83 82 self.assertEqual(name, 'hello.ipynb')
84 83 self.assertEqual(path, '/path/without/leading/slash/')
85 84
86 85 def test_url_encode(self):
87 86 nm = NotebookManager()
88 87
89 88 # changes path or notebook name with special characters to url encoding
90 89 # these tests specifically encode paths with spaces
91 90 path = nm.url_encode('/this is a test/for spaces/')
92 91 self.assertEqual(path, '/this%20is%20a%20test/for%20spaces/')
93 92
94 93 path = nm.url_encode('notebook with space.ipynb')
95 94 self.assertEqual(path, 'notebook%20with%20space.ipynb')
96 95
97 96 path = nm.url_encode('/path with a/notebook and space.ipynb')
98 97 self.assertEqual(path, '/path%20with%20a/notebook%20and%20space.ipynb')
99 98
100 99 def test_url_decode(self):
101 100 nm = NotebookManager()
102 101
103 102 # decodes a url string to a plain string
104 103 # these tests decode paths with spaces
105 104 path = nm.url_decode('/this%20is%20a%20test/for%20spaces/')
106 105 self.assertEqual(path, '/this is a test/for spaces/')
107 106
108 107 path = nm.url_decode('notebook%20with%20space.ipynb')
109 108 self.assertEqual(path, 'notebook with space.ipynb')
110 109
111 110 path = nm.url_decode('/path%20with%20a/notebook%20and%20space.ipynb')
112 111 self.assertEqual(path, '/path with a/notebook and space.ipynb')
General Comments 0
You need to be logged in to leave comments. Login now