##// END OF EJS Templates
support deleting empty directories...
MinRK -
Show More
@@ -1,496 +1,506 b''
1 """A contents manager that uses the local file system for storage."""
1 """A contents manager that uses the local file system for storage."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6 import base64
7 import io
7 import io
8 import os
8 import os
9 import glob
9 import glob
10 import shutil
10 import shutil
11
11
12 from tornado import web
12 from tornado import web
13
13
14 from .manager import ContentsManager
14 from .manager import ContentsManager
15 from IPython.nbformat import current
15 from IPython.nbformat import current
16 from IPython.utils.path import ensure_dir_exists
16 from IPython.utils.path import ensure_dir_exists
17 from IPython.utils.traitlets import Unicode, Bool, TraitError
17 from IPython.utils.traitlets import Unicode, Bool, TraitError
18 from IPython.utils.py3compat import getcwd
18 from IPython.utils.py3compat import getcwd
19 from IPython.utils import tz
19 from IPython.utils import tz
20 from IPython.html.utils import is_hidden, to_os_path
20 from IPython.html.utils import is_hidden, to_os_path
21
21
22
22
23 class FileContentsManager(ContentsManager):
23 class FileContentsManager(ContentsManager):
24
24
25 root_dir = Unicode(getcwd(), config=True)
25 root_dir = Unicode(getcwd(), config=True)
26
26
27 def _root_dir_changed(self, name, old, new):
27 def _root_dir_changed(self, name, old, new):
28 """Do a bit of validation of the root_dir."""
28 """Do a bit of validation of the root_dir."""
29 if not os.path.isabs(new):
29 if not os.path.isabs(new):
30 # If we receive a non-absolute path, make it absolute.
30 # If we receive a non-absolute path, make it absolute.
31 self.root_dir = os.path.abspath(new)
31 self.root_dir = os.path.abspath(new)
32 return
32 return
33 if not os.path.exists(new) or not os.path.isdir(new):
33 if not os.path.exists(new) or not os.path.isdir(new):
34 raise TraitError("%r is not a directory" % new)
34 raise TraitError("%r is not a directory" % new)
35
35
36 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
36 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
37 help="""The directory name in which to keep file checkpoints
37 help="""The directory name in which to keep file checkpoints
38
38
39 This is a path relative to the file's own directory.
39 This is a path relative to the file's own directory.
40
40
41 By default, it is .ipynb_checkpoints
41 By default, it is .ipynb_checkpoints
42 """
42 """
43 )
43 )
44
44
45 def _copy(self, src, dest):
45 def _copy(self, src, dest):
46 """copy src to dest
46 """copy src to dest
47
47
48 like shutil.copy2, but log errors in copystat
48 like shutil.copy2, but log errors in copystat
49 """
49 """
50 shutil.copyfile(src, dest)
50 shutil.copyfile(src, dest)
51 try:
51 try:
52 shutil.copystat(src, dest)
52 shutil.copystat(src, dest)
53 except OSError as e:
53 except OSError as e:
54 self.log.debug("copystat on %s failed", dest, exc_info=True)
54 self.log.debug("copystat on %s failed", dest, exc_info=True)
55
55
56 def _get_os_path(self, name=None, path=''):
56 def _get_os_path(self, name=None, path=''):
57 """Given a filename and a URL path, return its file system
57 """Given a filename and a URL path, return its file system
58 path.
58 path.
59
59
60 Parameters
60 Parameters
61 ----------
61 ----------
62 name : string
62 name : string
63 A filename
63 A filename
64 path : string
64 path : string
65 The relative URL path (with '/' as separator) to the named
65 The relative URL path (with '/' as separator) to the named
66 file.
66 file.
67
67
68 Returns
68 Returns
69 -------
69 -------
70 path : string
70 path : string
71 API path to be evaluated relative to root_dir.
71 API path to be evaluated relative to root_dir.
72 """
72 """
73 if name is not None:
73 if name is not None:
74 path = path + '/' + name
74 path = path + '/' + name
75 return to_os_path(path, self.root_dir)
75 return to_os_path(path, self.root_dir)
76
76
77 def path_exists(self, path):
77 def path_exists(self, path):
78 """Does the API-style path refer to an extant directory?
78 """Does the API-style path refer to an extant directory?
79
79
80 Parameters
80 Parameters
81 ----------
81 ----------
82 path : string
82 path : string
83 The path to check. This is an API path (`/` separated,
83 The path to check. This is an API path (`/` separated,
84 relative to root_dir).
84 relative to root_dir).
85
85
86 Returns
86 Returns
87 -------
87 -------
88 exists : bool
88 exists : bool
89 Whether the path is indeed a directory.
89 Whether the path is indeed a directory.
90 """
90 """
91 path = path.strip('/')
91 path = path.strip('/')
92 os_path = self._get_os_path(path=path)
92 os_path = self._get_os_path(path=path)
93 return os.path.isdir(os_path)
93 return os.path.isdir(os_path)
94
94
95 def is_hidden(self, path):
95 def is_hidden(self, path):
96 """Does the API style path correspond to a hidden directory or file?
96 """Does the API style path correspond to a hidden directory or file?
97
97
98 Parameters
98 Parameters
99 ----------
99 ----------
100 path : string
100 path : string
101 The path to check. This is an API path (`/` separated,
101 The path to check. This is an API path (`/` separated,
102 relative to root_dir).
102 relative to root_dir).
103
103
104 Returns
104 Returns
105 -------
105 -------
106 exists : bool
106 exists : bool
107 Whether the path is hidden.
107 Whether the path is hidden.
108
108
109 """
109 """
110 path = path.strip('/')
110 path = path.strip('/')
111 os_path = self._get_os_path(path=path)
111 os_path = self._get_os_path(path=path)
112 return is_hidden(os_path, self.root_dir)
112 return is_hidden(os_path, self.root_dir)
113
113
114 def file_exists(self, name, path=''):
114 def file_exists(self, name, path=''):
115 """Returns True if the file exists, else returns False.
115 """Returns True if the file exists, else returns False.
116
116
117 Parameters
117 Parameters
118 ----------
118 ----------
119 name : string
119 name : string
120 The name of the file you are checking.
120 The name of the file you are checking.
121 path : string
121 path : string
122 The relative path to the file's directory (with '/' as separator)
122 The relative path to the file's directory (with '/' as separator)
123
123
124 Returns
124 Returns
125 -------
125 -------
126 bool
126 bool
127 """
127 """
128 path = path.strip('/')
128 path = path.strip('/')
129 nbpath = self._get_os_path(name, path=path)
129 nbpath = self._get_os_path(name, path=path)
130 return os.path.isfile(nbpath)
130 return os.path.isfile(nbpath)
131
131
132 def exists(self, name=None, path=''):
132 def exists(self, name=None, path=''):
133 """Returns True if the path [and name] exists, else returns False.
133 """Returns True if the path [and name] exists, else returns False.
134
134
135 Parameters
135 Parameters
136 ----------
136 ----------
137 name : string
137 name : string
138 The name of the file you are checking.
138 The name of the file you are checking.
139 path : string
139 path : string
140 The relative path to the file's directory (with '/' as separator)
140 The relative path to the file's directory (with '/' as separator)
141
141
142 Returns
142 Returns
143 -------
143 -------
144 bool
144 bool
145 """
145 """
146 path = path.strip('/')
146 path = path.strip('/')
147 os_path = self._get_os_path(name, path=path)
147 os_path = self._get_os_path(name, path=path)
148 return os.path.exists(os_path)
148 return os.path.exists(os_path)
149
149
150 def _base_model(self, name, path=''):
150 def _base_model(self, name, path=''):
151 """Build the common base of a contents model"""
151 """Build the common base of a contents model"""
152 os_path = self._get_os_path(name, path)
152 os_path = self._get_os_path(name, path)
153 info = os.stat(os_path)
153 info = os.stat(os_path)
154 last_modified = tz.utcfromtimestamp(info.st_mtime)
154 last_modified = tz.utcfromtimestamp(info.st_mtime)
155 created = tz.utcfromtimestamp(info.st_ctime)
155 created = tz.utcfromtimestamp(info.st_ctime)
156 # Create the base model.
156 # Create the base model.
157 model = {}
157 model = {}
158 model['name'] = name
158 model['name'] = name
159 model['path'] = path
159 model['path'] = path
160 model['last_modified'] = last_modified
160 model['last_modified'] = last_modified
161 model['created'] = created
161 model['created'] = created
162 model['content'] = None
162 model['content'] = None
163 model['format'] = None
163 model['format'] = None
164 return model
164 return model
165
165
166 def _dir_model(self, name, path='', content=True):
166 def _dir_model(self, name, path='', content=True):
167 """Build a model for a directory
167 """Build a model for a directory
168
168
169 if content is requested, will include a listing of the directory
169 if content is requested, will include a listing of the directory
170 """
170 """
171 os_path = self._get_os_path(name, path)
171 os_path = self._get_os_path(name, path)
172
172
173 if not os.path.isdir(os_path):
173 if not os.path.isdir(os_path):
174 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
174 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
175 elif is_hidden(os_path, self.root_dir):
175 elif is_hidden(os_path, self.root_dir):
176 self.log.info("Refusing to serve hidden directory, via 404 Error")
176 self.log.info("Refusing to serve hidden directory, via 404 Error")
177 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
177 raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
178
178
179 if name is None:
179 if name is None:
180 if '/' in path:
180 if '/' in path:
181 path, name = path.rsplit('/', 1)
181 path, name = path.rsplit('/', 1)
182 else:
182 else:
183 name = ''
183 name = ''
184 model = self._base_model(name, path)
184 model = self._base_model(name, path)
185 model['type'] = 'directory'
185 model['type'] = 'directory'
186 dir_path = u'{}/{}'.format(path, name)
186 dir_path = u'{}/{}'.format(path, name)
187 if content:
187 if content:
188 model['content'] = contents = []
188 model['content'] = contents = []
189 for os_path in glob.glob(self._get_os_path('*', dir_path)):
189 for os_path in glob.glob(self._get_os_path('*', dir_path)):
190 name = os.path.basename(os_path)
190 name = os.path.basename(os_path)
191 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
191 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
192 contents.append(self.get_model(name=name, path=dir_path, content=False))
192 contents.append(self.get_model(name=name, path=dir_path, content=False))
193
193
194 model['format'] = 'json'
194 model['format'] = 'json'
195
195
196 return model
196 return model
197
197
198 def _file_model(self, name, path='', content=True):
198 def _file_model(self, name, path='', content=True):
199 """Build a model for a file
199 """Build a model for a file
200
200
201 if content is requested, include the file contents.
201 if content is requested, include the file contents.
202 UTF-8 text files will be unicode, binary files will be base64-encoded.
202 UTF-8 text files will be unicode, binary files will be base64-encoded.
203 """
203 """
204 model = self._base_model(name, path)
204 model = self._base_model(name, path)
205 model['type'] = 'file'
205 model['type'] = 'file'
206 if content:
206 if content:
207 os_path = self._get_os_path(name, path)
207 os_path = self._get_os_path(name, path)
208 try:
208 try:
209 with io.open(os_path, 'r', encoding='utf-8') as f:
209 with io.open(os_path, 'r', encoding='utf-8') as f:
210 model['content'] = f.read()
210 model['content'] = f.read()
211 except UnicodeError as e:
211 except UnicodeError as e:
212 with io.open(os_path, 'rb') as f:
212 with io.open(os_path, 'rb') as f:
213 bcontent = f.read()
213 bcontent = f.read()
214 model['content'] = base64.encodestring(bcontent).decode('ascii')
214 model['content'] = base64.encodestring(bcontent).decode('ascii')
215 model['format'] = 'base64'
215 model['format'] = 'base64'
216 else:
216 else:
217 model['format'] = 'text'
217 model['format'] = 'text'
218 return model
218 return model
219
219
220
220
221 def _notebook_model(self, name, path='', content=True):
221 def _notebook_model(self, name, path='', content=True):
222 """Build a notebook model
222 """Build a notebook model
223
223
224 if content is requested, the notebook content will be populated
224 if content is requested, the notebook content will be populated
225 as a JSON structure (not double-serialized)
225 as a JSON structure (not double-serialized)
226 """
226 """
227 model = self._base_model(name, path)
227 model = self._base_model(name, path)
228 model['type'] = 'notebook'
228 model['type'] = 'notebook'
229 if content:
229 if content:
230 os_path = self._get_os_path(name, path)
230 os_path = self._get_os_path(name, path)
231 with io.open(os_path, 'r', encoding='utf-8') as f:
231 with io.open(os_path, 'r', encoding='utf-8') as f:
232 try:
232 try:
233 nb = current.read(f, u'json')
233 nb = current.read(f, u'json')
234 except Exception as e:
234 except Exception as e:
235 raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
235 raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
236 self.mark_trusted_cells(nb, name, path)
236 self.mark_trusted_cells(nb, name, path)
237 model['content'] = nb
237 model['content'] = nb
238 model['format'] = 'json'
238 model['format'] = 'json'
239 return model
239 return model
240
240
241 def get_model(self, name, path='', content=True):
241 def get_model(self, name, path='', content=True):
242 """ Takes a path and name for an entity and returns its model
242 """ Takes a path and name for an entity and returns its model
243
243
244 Parameters
244 Parameters
245 ----------
245 ----------
246 name : str
246 name : str
247 the name of the target
247 the name of the target
248 path : str
248 path : str
249 the URL path that describes the relative path for the target
249 the URL path that describes the relative path for the target
250
250
251 Returns
251 Returns
252 -------
252 -------
253 model : dict
253 model : dict
254 the contents model. If content=True, returns the contents
254 the contents model. If content=True, returns the contents
255 of the file or directory as well.
255 of the file or directory as well.
256 """
256 """
257 path = path.strip('/')
257 path = path.strip('/')
258
258
259 if not self.exists(name=name, path=path):
259 if not self.exists(name=name, path=path):
260 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
260 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
261
261
262 os_path = self._get_os_path(name, path)
262 os_path = self._get_os_path(name, path)
263 if os.path.isdir(os_path):
263 if os.path.isdir(os_path):
264 model = self._dir_model(name, path, content)
264 model = self._dir_model(name, path, content)
265 elif name.endswith('.ipynb'):
265 elif name.endswith('.ipynb'):
266 model = self._notebook_model(name, path, content)
266 model = self._notebook_model(name, path, content)
267 else:
267 else:
268 model = self._file_model(name, path, content)
268 model = self._file_model(name, path, content)
269 return model
269 return model
270
270
271 def _save_notebook(self, os_path, model, name='', path=''):
271 def _save_notebook(self, os_path, model, name='', path=''):
272 """save a notebook file"""
272 """save a notebook file"""
273 # Save the notebook file
273 # Save the notebook file
274 nb = current.to_notebook_json(model['content'])
274 nb = current.to_notebook_json(model['content'])
275
275
276 self.check_and_sign(nb, name, path)
276 self.check_and_sign(nb, name, path)
277
277
278 if 'name' in nb['metadata']:
278 if 'name' in nb['metadata']:
279 nb['metadata']['name'] = u''
279 nb['metadata']['name'] = u''
280
280
281 with io.open(os_path, 'w', encoding='utf-8') as f:
281 with io.open(os_path, 'w', encoding='utf-8') as f:
282 current.write(nb, f, u'json')
282 current.write(nb, f, u'json')
283
283
284 def _save_file(self, os_path, model, name='', path=''):
284 def _save_file(self, os_path, model, name='', path=''):
285 """save a non-notebook file"""
285 """save a non-notebook file"""
286 fmt = model.get('format', None)
286 fmt = model.get('format', None)
287 if fmt not in {'text', 'base64'}:
287 if fmt not in {'text', 'base64'}:
288 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
288 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
289 try:
289 try:
290 content = model['content']
290 content = model['content']
291 if fmt == 'text':
291 if fmt == 'text':
292 bcontent = content.encode('utf8')
292 bcontent = content.encode('utf8')
293 else:
293 else:
294 b64_bytes = content.encode('ascii')
294 b64_bytes = content.encode('ascii')
295 bcontent = base64.decodestring(b64_bytes)
295 bcontent = base64.decodestring(b64_bytes)
296 except Exception as e:
296 except Exception as e:
297 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
297 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
298 with io.open(os_path, 'wb') as f:
298 with io.open(os_path, 'wb') as f:
299 f.write(bcontent)
299 f.write(bcontent)
300
300
301 def _save_directory(self, os_path, model, name='', path=''):
301 def _save_directory(self, os_path, model, name='', path=''):
302 """create a directory"""
302 """create a directory"""
303 if not os.path.exists(os_path):
303 if not os.path.exists(os_path):
304 os.mkdir(os_path)
304 os.mkdir(os_path)
305 elif not os.path.isdir(os_path):
305 elif not os.path.isdir(os_path):
306 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
306 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
307
307
308 def save(self, model, name='', path=''):
308 def save(self, model, name='', path=''):
309 """Save the file model and return the model with no content."""
309 """Save the file model and return the model with no content."""
310 path = path.strip('/')
310 path = path.strip('/')
311
311
312 if 'content' not in model:
312 if 'content' not in model:
313 raise web.HTTPError(400, u'No file content provided')
313 raise web.HTTPError(400, u'No file content provided')
314 if 'type' not in model:
314 if 'type' not in model:
315 raise web.HTTPError(400, u'No file type provided')
315 raise web.HTTPError(400, u'No file type provided')
316
316
317 # One checkpoint should always exist
317 # One checkpoint should always exist
318 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
318 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
319 self.create_checkpoint(name, path)
319 self.create_checkpoint(name, path)
320
320
321 new_path = model.get('path', path).strip('/')
321 new_path = model.get('path', path).strip('/')
322 new_name = model.get('name', name)
322 new_name = model.get('name', name)
323
323
324 if path != new_path or name != new_name:
324 if path != new_path or name != new_name:
325 self.rename(name, path, new_name, new_path)
325 self.rename(name, path, new_name, new_path)
326
326
327 os_path = self._get_os_path(new_name, new_path)
327 os_path = self._get_os_path(new_name, new_path)
328 self.log.debug("Saving %s", os_path)
328 self.log.debug("Saving %s", os_path)
329 try:
329 try:
330 if model['type'] == 'notebook':
330 if model['type'] == 'notebook':
331 self._save_notebook(os_path, model, new_name, new_path)
331 self._save_notebook(os_path, model, new_name, new_path)
332 elif model['type'] == 'file':
332 elif model['type'] == 'file':
333 self._save_file(os_path, model, new_name, new_path)
333 self._save_file(os_path, model, new_name, new_path)
334 elif model['type'] == 'directory':
334 elif model['type'] == 'directory':
335 self._save_directory(os_path, model, new_name, new_path)
335 self._save_directory(os_path, model, new_name, new_path)
336 else:
336 else:
337 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
337 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
338 except web.HTTPError:
338 except web.HTTPError:
339 raise
339 raise
340 except Exception as e:
340 except Exception as e:
341 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
341 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
342
342
343 model = self.get_model(new_name, new_path, content=False)
343 model = self.get_model(new_name, new_path, content=False)
344 return model
344 return model
345
345
346 def update(self, model, name, path=''):
346 def update(self, model, name, path=''):
347 """Update the file's path and/or name"""
347 """Update the file's path and/or name"""
348 path = path.strip('/')
348 path = path.strip('/')
349 new_name = model.get('name', name)
349 new_name = model.get('name', name)
350 new_path = model.get('path', path).strip('/')
350 new_path = model.get('path', path).strip('/')
351 if path != new_path or name != new_name:
351 if path != new_path or name != new_name:
352 self.rename(name, path, new_name, new_path)
352 self.rename(name, path, new_name, new_path)
353 model = self.get_model(new_name, new_path, content=False)
353 model = self.get_model(new_name, new_path, content=False)
354 return model
354 return model
355
355
356 def delete(self, name, path=''):
356 def delete(self, name, path=''):
357 """Delete file by name and path."""
357 """Delete file by name and path."""
358 path = path.strip('/')
358 path = path.strip('/')
359 os_path = self._get_os_path(name, path)
359 os_path = self._get_os_path(name, path)
360 if not os.path.isfile(os_path):
360 rm = os.unlink
361 if os.path.isdir(os_path):
362 listing = os.listdir(os_path)
363 # don't delete non-empty directories (checkpoints dir doesn't count)
364 if listing and listing != ['.ipynb_checkpoints']:
365 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
366 elif not os.path.isfile(os_path):
361 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
367 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
362
368
363 # clear checkpoints
369 # clear checkpoints
364 for checkpoint in self.list_checkpoints(name, path):
370 for checkpoint in self.list_checkpoints(name, path):
365 checkpoint_id = checkpoint['id']
371 checkpoint_id = checkpoint['id']
366 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
372 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
367 if os.path.isfile(cp_path):
373 if os.path.isfile(cp_path):
368 self.log.debug("Unlinking checkpoint %s", cp_path)
374 self.log.debug("Unlinking checkpoint %s", cp_path)
369 os.unlink(cp_path)
375 os.unlink(cp_path)
370
376
371 self.log.debug("Unlinking file %s", os_path)
377 if os.path.isdir(os_path):
372 os.unlink(os_path)
378 self.log.debug("Removing directory %s", os_path)
379 shutil.rmtree(os_path)
380 else:
381 self.log.debug("Unlinking file %s", os_path)
382 rm(os_path)
373
383
374 def rename(self, old_name, old_path, new_name, new_path):
384 def rename(self, old_name, old_path, new_name, new_path):
375 """Rename a file."""
385 """Rename a file."""
376 old_path = old_path.strip('/')
386 old_path = old_path.strip('/')
377 new_path = new_path.strip('/')
387 new_path = new_path.strip('/')
378 if new_name == old_name and new_path == old_path:
388 if new_name == old_name and new_path == old_path:
379 return
389 return
380
390
381 new_os_path = self._get_os_path(new_name, new_path)
391 new_os_path = self._get_os_path(new_name, new_path)
382 old_os_path = self._get_os_path(old_name, old_path)
392 old_os_path = self._get_os_path(old_name, old_path)
383
393
384 # Should we proceed with the move?
394 # Should we proceed with the move?
385 if os.path.isfile(new_os_path):
395 if os.path.isfile(new_os_path):
386 raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
396 raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
387
397
388 # Move the file
398 # Move the file
389 try:
399 try:
390 shutil.move(old_os_path, new_os_path)
400 shutil.move(old_os_path, new_os_path)
391 except Exception as e:
401 except Exception as e:
392 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
402 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
393
403
394 # Move the checkpoints
404 # Move the checkpoints
395 old_checkpoints = self.list_checkpoints(old_name, old_path)
405 old_checkpoints = self.list_checkpoints(old_name, old_path)
396 for cp in old_checkpoints:
406 for cp in old_checkpoints:
397 checkpoint_id = cp['id']
407 checkpoint_id = cp['id']
398 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
408 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
399 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
409 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
400 if os.path.isfile(old_cp_path):
410 if os.path.isfile(old_cp_path):
401 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
411 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
402 shutil.move(old_cp_path, new_cp_path)
412 shutil.move(old_cp_path, new_cp_path)
403
413
404 # Checkpoint-related utilities
414 # Checkpoint-related utilities
405
415
406 def get_checkpoint_path(self, checkpoint_id, name, path=''):
416 def get_checkpoint_path(self, checkpoint_id, name, path=''):
407 """find the path to a checkpoint"""
417 """find the path to a checkpoint"""
408 path = path.strip('/')
418 path = path.strip('/')
409 basename, ext = os.path.splitext(name)
419 basename, ext = os.path.splitext(name)
410 filename = u"{name}-{checkpoint_id}{ext}".format(
420 filename = u"{name}-{checkpoint_id}{ext}".format(
411 name=basename,
421 name=basename,
412 checkpoint_id=checkpoint_id,
422 checkpoint_id=checkpoint_id,
413 ext=ext,
423 ext=ext,
414 )
424 )
415 os_path = self._get_os_path(path=path)
425 os_path = self._get_os_path(path=path)
416 cp_dir = os.path.join(os_path, self.checkpoint_dir)
426 cp_dir = os.path.join(os_path, self.checkpoint_dir)
417 ensure_dir_exists(cp_dir)
427 ensure_dir_exists(cp_dir)
418 cp_path = os.path.join(cp_dir, filename)
428 cp_path = os.path.join(cp_dir, filename)
419 return cp_path
429 return cp_path
420
430
421 def get_checkpoint_model(self, checkpoint_id, name, path=''):
431 def get_checkpoint_model(self, checkpoint_id, name, path=''):
422 """construct the info dict for a given checkpoint"""
432 """construct the info dict for a given checkpoint"""
423 path = path.strip('/')
433 path = path.strip('/')
424 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
434 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
425 stats = os.stat(cp_path)
435 stats = os.stat(cp_path)
426 last_modified = tz.utcfromtimestamp(stats.st_mtime)
436 last_modified = tz.utcfromtimestamp(stats.st_mtime)
427 info = dict(
437 info = dict(
428 id = checkpoint_id,
438 id = checkpoint_id,
429 last_modified = last_modified,
439 last_modified = last_modified,
430 )
440 )
431 return info
441 return info
432
442
433 # public checkpoint API
443 # public checkpoint API
434
444
435 def create_checkpoint(self, name, path=''):
445 def create_checkpoint(self, name, path=''):
436 """Create a checkpoint from the current state of a file"""
446 """Create a checkpoint from the current state of a file"""
437 path = path.strip('/')
447 path = path.strip('/')
438 src_path = self._get_os_path(name, path)
448 src_path = self._get_os_path(name, path)
439 # only the one checkpoint ID:
449 # only the one checkpoint ID:
440 checkpoint_id = u"checkpoint"
450 checkpoint_id = u"checkpoint"
441 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
451 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
442 self.log.debug("creating checkpoint for %s", name)
452 self.log.debug("creating checkpoint for %s", name)
443 self._copy(src_path, cp_path)
453 self._copy(src_path, cp_path)
444
454
445 # return the checkpoint info
455 # return the checkpoint info
446 return self.get_checkpoint_model(checkpoint_id, name, path)
456 return self.get_checkpoint_model(checkpoint_id, name, path)
447
457
448 def list_checkpoints(self, name, path=''):
458 def list_checkpoints(self, name, path=''):
449 """list the checkpoints for a given file
459 """list the checkpoints for a given file
450
460
451 This contents manager currently only supports one checkpoint per file.
461 This contents manager currently only supports one checkpoint per file.
452 """
462 """
453 path = path.strip('/')
463 path = path.strip('/')
454 checkpoint_id = "checkpoint"
464 checkpoint_id = "checkpoint"
455 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
465 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
456 if not os.path.exists(os_path):
466 if not os.path.exists(os_path):
457 return []
467 return []
458 else:
468 else:
459 return [self.get_checkpoint_model(checkpoint_id, name, path)]
469 return [self.get_checkpoint_model(checkpoint_id, name, path)]
460
470
461
471
462 def restore_checkpoint(self, checkpoint_id, name, path=''):
472 def restore_checkpoint(self, checkpoint_id, name, path=''):
463 """restore a file to a checkpointed state"""
473 """restore a file to a checkpointed state"""
464 path = path.strip('/')
474 path = path.strip('/')
465 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
475 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
466 nb_path = self._get_os_path(name, path)
476 nb_path = self._get_os_path(name, path)
467 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
477 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
468 if not os.path.isfile(cp_path):
478 if not os.path.isfile(cp_path):
469 self.log.debug("checkpoint file does not exist: %s", cp_path)
479 self.log.debug("checkpoint file does not exist: %s", cp_path)
470 raise web.HTTPError(404,
480 raise web.HTTPError(404,
471 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
481 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
472 )
482 )
473 # ensure notebook is readable (never restore from an unreadable notebook)
483 # ensure notebook is readable (never restore from an unreadable notebook)
474 if cp_path.endswith('.ipynb'):
484 if cp_path.endswith('.ipynb'):
475 with io.open(cp_path, 'r', encoding='utf-8') as f:
485 with io.open(cp_path, 'r', encoding='utf-8') as f:
476 current.read(f, u'json')
486 current.read(f, u'json')
477 self._copy(cp_path, nb_path)
487 self._copy(cp_path, nb_path)
478 self.log.debug("copying %s -> %s", cp_path, nb_path)
488 self.log.debug("copying %s -> %s", cp_path, nb_path)
479
489
480 def delete_checkpoint(self, checkpoint_id, name, path=''):
490 def delete_checkpoint(self, checkpoint_id, name, path=''):
481 """delete a file's checkpoint"""
491 """delete a file's checkpoint"""
482 path = path.strip('/')
492 path = path.strip('/')
483 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
493 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
484 if not os.path.isfile(cp_path):
494 if not os.path.isfile(cp_path):
485 raise web.HTTPError(404,
495 raise web.HTTPError(404,
486 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
496 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
487 )
497 )
488 self.log.debug("unlinking %s", cp_path)
498 self.log.debug("unlinking %s", cp_path)
489 os.unlink(cp_path)
499 os.unlink(cp_path)
490
500
491 def info_string(self):
501 def info_string(self):
492 return "Serving notebooks from local directory: %s" % self.root_dir
502 return "Serving notebooks from local directory: %s" % self.root_dir
493
503
494 def get_kernel_path(self, name, path='', model=None):
504 def get_kernel_path(self, name, path='', model=None):
495 """Return the initial working dir a kernel associated with a given notebook"""
505 """Return the initial working dir a kernel associated with a given notebook"""
496 return os.path.join(self.root_dir, path)
506 return os.path.join(self.root_dir, path)
@@ -1,257 +1,261 b''
1 """A base class for contents managers."""
1 """A base class for contents managers."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from fnmatch import fnmatch
6 from fnmatch import fnmatch
7 import itertools
7 import itertools
8 import os
8 import os
9
9
10 from tornado.web import HTTPError
11
10 from IPython.config.configurable import LoggingConfigurable
12 from IPython.config.configurable import LoggingConfigurable
11 from IPython.nbformat import current, sign
13 from IPython.nbformat import current, sign
12 from IPython.utils.traitlets import Instance, Unicode, List
14 from IPython.utils.traitlets import Instance, Unicode, List
13
15
14
16
15 class ContentsManager(LoggingConfigurable):
17 class ContentsManager(LoggingConfigurable):
16
18
17 notary = Instance(sign.NotebookNotary)
19 notary = Instance(sign.NotebookNotary)
18 def _notary_default(self):
20 def _notary_default(self):
19 return sign.NotebookNotary(parent=self)
21 return sign.NotebookNotary(parent=self)
20
22
21 hide_globs = List(Unicode, [
23 hide_globs = List(Unicode, [
22 u'__pycache__', '*.pyc', '*.pyo',
24 u'__pycache__', '*.pyc', '*.pyo',
23 '.DS_Store', '*.so', '*.dylib', '*~',
25 '.DS_Store', '*.so', '*.dylib', '*~',
24 ], config=True, help="""
26 ], config=True, help="""
25 Glob patterns to hide in file and directory listings.
27 Glob patterns to hide in file and directory listings.
26 """)
28 """)
27
29
28 # ContentsManager API part 1: methods that must be
30 # ContentsManager API part 1: methods that must be
29 # implemented in subclasses.
31 # implemented in subclasses.
30
32
31 def path_exists(self, path):
33 def path_exists(self, path):
32 """Does the API-style path (directory) actually exist?
34 """Does the API-style path (directory) actually exist?
33
35
34 Override this method in subclasses.
36 Override this method in subclasses.
35
37
36 Parameters
38 Parameters
37 ----------
39 ----------
38 path : string
40 path : string
39 The path to check
41 The path to check
40
42
41 Returns
43 Returns
42 -------
44 -------
43 exists : bool
45 exists : bool
44 Whether the path does indeed exist.
46 Whether the path does indeed exist.
45 """
47 """
46 raise NotImplementedError
48 raise NotImplementedError
47
49
48 def is_hidden(self, path):
50 def is_hidden(self, path):
49 """Does the API style path correspond to a hidden directory or file?
51 """Does the API style path correspond to a hidden directory or file?
50
52
51 Parameters
53 Parameters
52 ----------
54 ----------
53 path : string
55 path : string
54 The path to check. This is an API path (`/` separated,
56 The path to check. This is an API path (`/` separated,
55 relative to root dir).
57 relative to root dir).
56
58
57 Returns
59 Returns
58 -------
60 -------
59 exists : bool
61 exists : bool
60 Whether the path is hidden.
62 Whether the path is hidden.
61
63
62 """
64 """
63 raise NotImplementedError
65 raise NotImplementedError
64
66
65 def file_exists(self, name, path=''):
67 def file_exists(self, name, path=''):
66 """Returns a True if the file exists. Else, returns False.
68 """Returns a True if the file exists. Else, returns False.
67
69
68 Parameters
70 Parameters
69 ----------
71 ----------
70 name : string
72 name : string
71 The name of the file you are checking.
73 The name of the file you are checking.
72 path : string
74 path : string
73 The relative path to the file's directory (with '/' as separator)
75 The relative path to the file's directory (with '/' as separator)
74
76
75 Returns
77 Returns
76 -------
78 -------
77 bool
79 bool
78 """
80 """
79 raise NotImplementedError('must be implemented in a subclass')
81 raise NotImplementedError('must be implemented in a subclass')
80
82
81 def list(self, path=''):
83 def list(self, path=''):
82 """Return a list of contents dicts without content.
84 """Return a list of contents dicts without content.
83
85
84 This returns a list of dicts
86 This returns a list of dicts
85
87
86 This list of dicts should be sorted by name::
88 This list of dicts should be sorted by name::
87
89
88 data = sorted(data, key=lambda item: item['name'])
90 data = sorted(data, key=lambda item: item['name'])
89 """
91 """
90 raise NotImplementedError('must be implemented in a subclass')
92 raise NotImplementedError('must be implemented in a subclass')
91
93
92 def get_model(self, name, path='', content=True):
94 def get_model(self, name, path='', content=True):
93 """Get the model of a file or directory with or without content."""
95 """Get the model of a file or directory with or without content."""
94 raise NotImplementedError('must be implemented in a subclass')
96 raise NotImplementedError('must be implemented in a subclass')
95
97
96 def save(self, model, name, path=''):
98 def save(self, model, name, path=''):
97 """Save the file or directory and return the model with no content."""
99 """Save the file or directory and return the model with no content."""
98 raise NotImplementedError('must be implemented in a subclass')
100 raise NotImplementedError('must be implemented in a subclass')
99
101
100 def update(self, model, name, path=''):
102 def update(self, model, name, path=''):
101 """Update the file or directory and return the model with no content."""
103 """Update the file or directory and return the model with no content."""
102 raise NotImplementedError('must be implemented in a subclass')
104 raise NotImplementedError('must be implemented in a subclass')
103
105
104 def delete(self, name, path=''):
106 def delete(self, name, path=''):
105 """Delete file or directory by name and path."""
107 """Delete file or directory by name and path."""
106 raise NotImplementedError('must be implemented in a subclass')
108 raise NotImplementedError('must be implemented in a subclass')
107
109
108 def create_checkpoint(self, name, path=''):
110 def create_checkpoint(self, name, path=''):
109 """Create a checkpoint of the current state of a file
111 """Create a checkpoint of the current state of a file
110
112
111 Returns a checkpoint_id for the new checkpoint.
113 Returns a checkpoint_id for the new checkpoint.
112 """
114 """
113 raise NotImplementedError("must be implemented in a subclass")
115 raise NotImplementedError("must be implemented in a subclass")
114
116
115 def list_checkpoints(self, name, path=''):
117 def list_checkpoints(self, name, path=''):
116 """Return a list of checkpoints for a given file"""
118 """Return a list of checkpoints for a given file"""
117 return []
119 return []
118
120
119 def restore_checkpoint(self, checkpoint_id, name, path=''):
121 def restore_checkpoint(self, checkpoint_id, name, path=''):
120 """Restore a file from one of its checkpoints"""
122 """Restore a file from one of its checkpoints"""
121 raise NotImplementedError("must be implemented in a subclass")
123 raise NotImplementedError("must be implemented in a subclass")
122
124
123 def delete_checkpoint(self, checkpoint_id, name, path=''):
125 def delete_checkpoint(self, checkpoint_id, name, path=''):
124 """delete a checkpoint for a file"""
126 """delete a checkpoint for a file"""
125 raise NotImplementedError("must be implemented in a subclass")
127 raise NotImplementedError("must be implemented in a subclass")
126
128
127 def info_string(self):
129 def info_string(self):
128 return "Serving notebooks"
130 return "Serving notebooks"
129
131
130 # ContentsManager API part 2: methods that have useable default
132 # ContentsManager API part 2: methods that have useable default
131 # implementations, but can be overridden in subclasses.
133 # implementations, but can be overridden in subclasses.
132
134
133 def get_kernel_path(self, name, path='', model=None):
135 def get_kernel_path(self, name, path='', model=None):
134 """ Return the path to start kernel in """
136 """ Return the path to start kernel in """
135 return path
137 return path
136
138
137 def increment_filename(self, filename, path=''):
139 def increment_filename(self, filename, path=''):
138 """Increment a filename until it is unique.
140 """Increment a filename until it is unique.
139
141
140 Parameters
142 Parameters
141 ----------
143 ----------
142 filename : unicode
144 filename : unicode
143 The name of a file, including extension
145 The name of a file, including extension
144 path : unicode
146 path : unicode
145 The URL path of the target's directory
147 The URL path of the target's directory
146
148
147 Returns
149 Returns
148 -------
150 -------
149 name : unicode
151 name : unicode
150 A filename that is unique, based on the input filename.
152 A filename that is unique, based on the input filename.
151 """
153 """
152 path = path.strip('/')
154 path = path.strip('/')
153 basename, ext = os.path.splitext(filename)
155 basename, ext = os.path.splitext(filename)
154 for i in itertools.count():
156 for i in itertools.count():
155 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
157 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
156 ext=ext)
158 ext=ext)
157 if not self.file_exists(name, path):
159 if not self.file_exists(name, path):
158 break
160 break
159 return name
161 return name
160
162
161 def create_file(self, model=None, path='', ext='.ipynb'):
163 def create_file(self, model=None, path='', ext='.ipynb'):
162 """Create a new file or directory and return its model with no content."""
164 """Create a new file or directory and return its model with no content."""
163 path = path.strip('/')
165 path = path.strip('/')
164 if model is None:
166 if model is None:
165 model = {}
167 model = {}
166 if 'content' not in model:
168 if 'content' not in model:
167 if ext == '.ipynb':
169 if ext == '.ipynb':
168 metadata = current.new_metadata(name=u'')
170 metadata = current.new_metadata(name=u'')
169 model['content'] = current.new_notebook(metadata=metadata)
171 model['content'] = current.new_notebook(metadata=metadata)
170 model.setdefault('type', 'notebook')
172 model.setdefault('type', 'notebook')
171 model.setdefault('format', 'json')
173 model.setdefault('format', 'json')
172 else:
174 else:
173 model['content'] = ''
175 model['content'] = ''
174 model.setdefault('type', 'file')
176 model.setdefault('type', 'file')
175 model.setdefault('format', 'text')
177 model.setdefault('format', 'text')
176 if 'name' not in model:
178 if 'name' not in model:
177 model['name'] = self.increment_filename('Untitled' + ext, path)
179 model['name'] = self.increment_filename('Untitled' + ext, path)
178
180
179 model['path'] = path
181 model['path'] = path
180 model = self.save(model, model['name'], model['path'])
182 model = self.save(model, model['name'], model['path'])
181 return model
183 return model
182
184
183 def copy(self, from_name, to_name=None, path=''):
185 def copy(self, from_name, to_name=None, path=''):
184 """Copy an existing file and return its new model.
186 """Copy an existing file and return its new model.
185
187
186 If to_name not specified, increment `from_name-Copy#.ipynb`.
188 If to_name not specified, increment `from_name-Copy#.ipynb`.
187 """
189 """
188 path = path.strip('/')
190 path = path.strip('/')
189 model = self.get_model(from_name, path)
191 model = self.get_model(from_name, path)
192 if model['type'] == 'directory':
193 raise HTTPError(400, "Can't copy directories")
190 if not to_name:
194 if not to_name:
191 base, ext = os.path.splitext(from_name)
195 base, ext = os.path.splitext(from_name)
192 copy_name = u'{0}-Copy{1}'.format(base, ext)
196 copy_name = u'{0}-Copy{1}'.format(base, ext)
193 to_name = self.increment_filename(copy_name, path)
197 to_name = self.increment_filename(copy_name, path)
194 model['name'] = to_name
198 model['name'] = to_name
195 model = self.save(model, to_name, path)
199 model = self.save(model, to_name, path)
196 return model
200 return model
197
201
198 def log_info(self):
202 def log_info(self):
199 self.log.info(self.info_string())
203 self.log.info(self.info_string())
200
204
201 def trust_notebook(self, name, path=''):
205 def trust_notebook(self, name, path=''):
202 """Explicitly trust a notebook
206 """Explicitly trust a notebook
203
207
204 Parameters
208 Parameters
205 ----------
209 ----------
206 name : string
210 name : string
207 The filename of the notebook
211 The filename of the notebook
208 path : string
212 path : string
209 The notebook's directory
213 The notebook's directory
210 """
214 """
211 model = self.get_model(name, path)
215 model = self.get_model(name, path)
212 nb = model['content']
216 nb = model['content']
213 self.log.warn("Trusting notebook %s/%s", path, name)
217 self.log.warn("Trusting notebook %s/%s", path, name)
214 self.notary.mark_cells(nb, True)
218 self.notary.mark_cells(nb, True)
215 self.save(model, name, path)
219 self.save(model, name, path)
216
220
217 def check_and_sign(self, nb, name, path=''):
221 def check_and_sign(self, nb, name, path=''):
218 """Check for trusted cells, and sign the notebook.
222 """Check for trusted cells, and sign the notebook.
219
223
220 Called as a part of saving notebooks.
224 Called as a part of saving notebooks.
221
225
222 Parameters
226 Parameters
223 ----------
227 ----------
224 nb : dict
228 nb : dict
225 The notebook structure
229 The notebook structure
226 name : string
230 name : string
227 The filename of the notebook
231 The filename of the notebook
228 path : string
232 path : string
229 The notebook's directory
233 The notebook's directory
230 """
234 """
231 if self.notary.check_cells(nb):
235 if self.notary.check_cells(nb):
232 self.notary.sign(nb)
236 self.notary.sign(nb)
233 else:
237 else:
234 self.log.warn("Saving untrusted notebook %s/%s", path, name)
238 self.log.warn("Saving untrusted notebook %s/%s", path, name)
235
239
236 def mark_trusted_cells(self, nb, name, path=''):
240 def mark_trusted_cells(self, nb, name, path=''):
237 """Mark cells as trusted if the notebook signature matches.
241 """Mark cells as trusted if the notebook signature matches.
238
242
239 Called as a part of loading notebooks.
243 Called as a part of loading notebooks.
240
244
241 Parameters
245 Parameters
242 ----------
246 ----------
243 nb : dict
247 nb : dict
244 The notebook structure
248 The notebook structure
245 name : string
249 name : string
246 The filename of the notebook
250 The filename of the notebook
247 path : string
251 path : string
248 The notebook's directory
252 The notebook's directory
249 """
253 """
250 trusted = self.notary.check_signature(nb)
254 trusted = self.notary.check_signature(nb)
251 if not trusted:
255 if not trusted:
252 self.log.warn("Notebook %s/%s is not trusted", path, name)
256 self.log.warn("Notebook %s/%s is not trusted", path, name)
253 self.notary.mark_cells(nb, trusted)
257 self.notary.mark_cells(nb, trusted)
254
258
255 def should_list(self, name):
259 def should_list(self, name):
256 """Should this file/directory name be displayed in a listing?"""
260 """Should this file/directory name be displayed in a listing?"""
257 return not any(fnmatch(name, glob) for glob in self.hide_globs)
261 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,459 +1,479 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 import io
5 import io
6 import json
6 import json
7 import os
7 import os
8 import shutil
8 import shutil
9 from unicodedata import normalize
9 from unicodedata import normalize
10
10
11 pjoin = os.path.join
11 pjoin = os.path.join
12
12
13 import requests
13 import requests
14
14
15 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.utils import url_path_join, url_escape
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import current
17 from IPython.nbformat import current
18 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
18 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
19 new_heading_cell, to_notebook_json)
19 new_heading_cell, to_notebook_json)
20 from IPython.nbformat import v2
20 from IPython.nbformat import v2
21 from IPython.utils import py3compat
21 from IPython.utils import py3compat
22 from IPython.utils.data import uniq_stable
22 from IPython.utils.data import uniq_stable
23
23
24
24
25 # TODO: Remove this after we create the contents web service and directories are
25 # TODO: Remove this after we create the contents web service and directories are
26 # no longer listed by the notebook web service.
26 # no longer listed by the notebook web service.
27 def notebooks_only(dir_model):
27 def notebooks_only(dir_model):
28 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
29
29
30 def dirs_only(dir_model):
30 def dirs_only(dir_model):
31 return [x for x in dir_model['content'] if x['type']=='directory']
31 return [x for x in dir_model['content'] if x['type']=='directory']
32
32
33
33
34 class API(object):
34 class API(object):
35 """Wrapper for contents API calls."""
35 """Wrapper for contents API calls."""
36 def __init__(self, base_url):
36 def __init__(self, base_url):
37 self.base_url = base_url
37 self.base_url = base_url
38
38
39 def _req(self, verb, path, body=None):
39 def _req(self, verb, path, body=None):
40 response = requests.request(verb,
40 response = requests.request(verb,
41 url_path_join(self.base_url, 'api/contents', path),
41 url_path_join(self.base_url, 'api/contents', path),
42 data=body,
42 data=body,
43 )
43 )
44 response.raise_for_status()
44 response.raise_for_status()
45 return response
45 return response
46
46
47 def list(self, path='/'):
47 def list(self, path='/'):
48 return self._req('GET', path)
48 return self._req('GET', path)
49
49
50 def read(self, name, path='/'):
50 def read(self, name, path='/'):
51 return self._req('GET', url_path_join(path, name))
51 return self._req('GET', url_path_join(path, name))
52
52
53 def create_untitled(self, path='/', ext=None):
53 def create_untitled(self, path='/', ext=None):
54 body = None
54 body = None
55 if ext:
55 if ext:
56 body = json.dumps({'ext': ext})
56 body = json.dumps({'ext': ext})
57 return self._req('POST', path, body)
57 return self._req('POST', path, body)
58
58
59 def upload_untitled(self, body, path='/'):
59 def upload_untitled(self, body, path='/'):
60 return self._req('POST', path, body)
60 return self._req('POST', path, body)
61
61
62 def copy_untitled(self, copy_from, path='/'):
62 def copy_untitled(self, copy_from, path='/'):
63 body = json.dumps({'copy_from':copy_from})
63 body = json.dumps({'copy_from':copy_from})
64 return self._req('POST', path, body)
64 return self._req('POST', path, body)
65
65
66 def create(self, name, path='/'):
66 def create(self, name, path='/'):
67 return self._req('PUT', url_path_join(path, name))
67 return self._req('PUT', url_path_join(path, name))
68
68
69 def upload(self, name, body, path='/'):
69 def upload(self, name, body, path='/'):
70 return self._req('PUT', url_path_join(path, name), body)
70 return self._req('PUT', url_path_join(path, name), body)
71
71
72 def mkdir(self, name, path='/'):
73 return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
74
72 def copy(self, copy_from, copy_to, path='/'):
75 def copy(self, copy_from, copy_to, path='/'):
73 body = json.dumps({'copy_from':copy_from})
76 body = json.dumps({'copy_from':copy_from})
74 return self._req('PUT', url_path_join(path, copy_to), body)
77 return self._req('PUT', url_path_join(path, copy_to), body)
75
78
76 def save(self, name, body, path='/'):
79 def save(self, name, body, path='/'):
77 return self._req('PUT', url_path_join(path, name), body)
80 return self._req('PUT', url_path_join(path, name), body)
78
81
79 def delete(self, name, path='/'):
82 def delete(self, name, path='/'):
80 return self._req('DELETE', url_path_join(path, name))
83 return self._req('DELETE', url_path_join(path, name))
81
84
82 def rename(self, name, path, new_name):
85 def rename(self, name, path, new_name):
83 body = json.dumps({'name': new_name})
86 body = json.dumps({'name': new_name})
84 return self._req('PATCH', url_path_join(path, name), body)
87 return self._req('PATCH', url_path_join(path, name), body)
85
88
86 def get_checkpoints(self, name, path):
89 def get_checkpoints(self, name, path):
87 return self._req('GET', url_path_join(path, name, 'checkpoints'))
90 return self._req('GET', url_path_join(path, name, 'checkpoints'))
88
91
89 def new_checkpoint(self, name, path):
92 def new_checkpoint(self, name, path):
90 return self._req('POST', url_path_join(path, name, 'checkpoints'))
93 return self._req('POST', url_path_join(path, name, 'checkpoints'))
91
94
92 def restore_checkpoint(self, name, path, checkpoint_id):
95 def restore_checkpoint(self, name, path, checkpoint_id):
93 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
96 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
94
97
95 def delete_checkpoint(self, name, path, checkpoint_id):
98 def delete_checkpoint(self, name, path, checkpoint_id):
96 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
99 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
97
100
98 class APITest(NotebookTestBase):
101 class APITest(NotebookTestBase):
99 """Test the kernels web service API"""
102 """Test the kernels web service API"""
100 dirs_nbs = [('', 'inroot'),
103 dirs_nbs = [('', 'inroot'),
101 ('Directory with spaces in', 'inspace'),
104 ('Directory with spaces in', 'inspace'),
102 (u'unicodé', 'innonascii'),
105 (u'unicodé', 'innonascii'),
103 ('foo', 'a'),
106 ('foo', 'a'),
104 ('foo', 'b'),
107 ('foo', 'b'),
105 ('foo', 'name with spaces'),
108 ('foo', 'name with spaces'),
106 ('foo', u'unicodé'),
109 ('foo', u'unicodé'),
107 ('foo/bar', 'baz'),
110 ('foo/bar', 'baz'),
108 ('ordering', 'A'),
111 ('ordering', 'A'),
109 ('ordering', 'b'),
112 ('ordering', 'b'),
110 ('ordering', 'C'),
113 ('ordering', 'C'),
111 (u'å b', u'ç d'),
114 (u'å b', u'ç d'),
112 ]
115 ]
113 hidden_dirs = ['.hidden', '__pycache__']
116 hidden_dirs = ['.hidden', '__pycache__']
114
117
115 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
118 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
116 del dirs[0] # remove ''
119 del dirs[0] # remove ''
117 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
120 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
118
121
119 @staticmethod
122 @staticmethod
120 def _blob_for_name(name):
123 def _blob_for_name(name):
121 return name.encode('utf-8') + b'\xFF'
124 return name.encode('utf-8') + b'\xFF'
122
125
123 @staticmethod
126 @staticmethod
124 def _txt_for_name(name):
127 def _txt_for_name(name):
125 return u'%s text file' % name
128 return u'%s text file' % name
126
129
127 def setUp(self):
130 def setUp(self):
128 nbdir = self.notebook_dir.name
131 nbdir = self.notebook_dir.name
129 self.blob = os.urandom(100)
132 self.blob = os.urandom(100)
130 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
133 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
131
134
132
135
133
136
134 for d in (self.dirs + self.hidden_dirs):
137 for d in (self.dirs + self.hidden_dirs):
135 d.replace('/', os.sep)
138 d.replace('/', os.sep)
136 if not os.path.isdir(pjoin(nbdir, d)):
139 if not os.path.isdir(pjoin(nbdir, d)):
137 os.mkdir(pjoin(nbdir, d))
140 os.mkdir(pjoin(nbdir, d))
138
141
139 for d, name in self.dirs_nbs:
142 for d, name in self.dirs_nbs:
140 d = d.replace('/', os.sep)
143 d = d.replace('/', os.sep)
141 # create a notebook
144 # create a notebook
142 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
145 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
143 encoding='utf-8') as f:
146 encoding='utf-8') as f:
144 nb = new_notebook(name=name)
147 nb = new_notebook(name=name)
145 write(nb, f, format='ipynb')
148 write(nb, f, format='ipynb')
146
149
147 # create a text file
150 # create a text file
148 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
151 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
149 encoding='utf-8') as f:
152 encoding='utf-8') as f:
150 f.write(self._txt_for_name(name))
153 f.write(self._txt_for_name(name))
151
154
152 # create a binary file
155 # create a binary file
153 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
156 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
154 f.write(self._blob_for_name(name))
157 f.write(self._blob_for_name(name))
155
158
156 self.api = API(self.base_url())
159 self.api = API(self.base_url())
157
160
158 def tearDown(self):
161 def tearDown(self):
159 nbdir = self.notebook_dir.name
162 nbdir = self.notebook_dir.name
160
163
161 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
164 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
162 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
165 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
163
166
164 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
167 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
165 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
168 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
166
169
167 def test_list_notebooks(self):
170 def test_list_notebooks(self):
168 nbs = notebooks_only(self.api.list().json())
171 nbs = notebooks_only(self.api.list().json())
169 self.assertEqual(len(nbs), 1)
172 self.assertEqual(len(nbs), 1)
170 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
173 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
171
174
172 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
175 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
173 self.assertEqual(len(nbs), 1)
176 self.assertEqual(len(nbs), 1)
174 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
177 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
175
178
176 nbs = notebooks_only(self.api.list(u'/unicodé/').json())
179 nbs = notebooks_only(self.api.list(u'/unicodé/').json())
177 self.assertEqual(len(nbs), 1)
180 self.assertEqual(len(nbs), 1)
178 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
181 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
179 self.assertEqual(nbs[0]['path'], u'unicodé')
182 self.assertEqual(nbs[0]['path'], u'unicodé')
180
183
181 nbs = notebooks_only(self.api.list('/foo/bar/').json())
184 nbs = notebooks_only(self.api.list('/foo/bar/').json())
182 self.assertEqual(len(nbs), 1)
185 self.assertEqual(len(nbs), 1)
183 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
186 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
184 self.assertEqual(nbs[0]['path'], 'foo/bar')
187 self.assertEqual(nbs[0]['path'], 'foo/bar')
185
188
186 nbs = notebooks_only(self.api.list('foo').json())
189 nbs = notebooks_only(self.api.list('foo').json())
187 self.assertEqual(len(nbs), 4)
190 self.assertEqual(len(nbs), 4)
188 nbnames = { normalize('NFC', n['name']) for n in nbs }
191 nbnames = { normalize('NFC', n['name']) for n in nbs }
189 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
192 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
190 expected = { normalize('NFC', name) for name in expected }
193 expected = { normalize('NFC', name) for name in expected }
191 self.assertEqual(nbnames, expected)
194 self.assertEqual(nbnames, expected)
192
195
193 nbs = notebooks_only(self.api.list('ordering').json())
196 nbs = notebooks_only(self.api.list('ordering').json())
194 nbnames = [n['name'] for n in nbs]
197 nbnames = [n['name'] for n in nbs]
195 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
198 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
196 self.assertEqual(nbnames, expected)
199 self.assertEqual(nbnames, expected)
197
200
198 def test_list_dirs(self):
201 def test_list_dirs(self):
199 dirs = dirs_only(self.api.list().json())
202 dirs = dirs_only(self.api.list().json())
200 dir_names = {normalize('NFC', d['name']) for d in dirs}
203 dir_names = {normalize('NFC', d['name']) for d in dirs}
201 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
204 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
202
205
203 def test_list_nonexistant_dir(self):
206 def test_list_nonexistant_dir(self):
204 with assert_http_error(404):
207 with assert_http_error(404):
205 self.api.list('nonexistant')
208 self.api.list('nonexistant')
206
209
207 def test_get_nb_contents(self):
210 def test_get_nb_contents(self):
208 for d, name in self.dirs_nbs:
211 for d, name in self.dirs_nbs:
209 nb = self.api.read('%s.ipynb' % name, d+'/').json()
212 nb = self.api.read('%s.ipynb' % name, d+'/').json()
210 self.assertEqual(nb['name'], u'%s.ipynb' % name)
213 self.assertEqual(nb['name'], u'%s.ipynb' % name)
211 self.assertEqual(nb['type'], 'notebook')
214 self.assertEqual(nb['type'], 'notebook')
212 self.assertIn('content', nb)
215 self.assertIn('content', nb)
213 self.assertEqual(nb['format'], 'json')
216 self.assertEqual(nb['format'], 'json')
214 self.assertIn('content', nb)
217 self.assertIn('content', nb)
215 self.assertIn('metadata', nb['content'])
218 self.assertIn('metadata', nb['content'])
216 self.assertIsInstance(nb['content']['metadata'], dict)
219 self.assertIsInstance(nb['content']['metadata'], dict)
217
220
218 def test_get_contents_no_such_file(self):
221 def test_get_contents_no_such_file(self):
219 # Name that doesn't exist - should be a 404
222 # Name that doesn't exist - should be a 404
220 with assert_http_error(404):
223 with assert_http_error(404):
221 self.api.read('q.ipynb', 'foo')
224 self.api.read('q.ipynb', 'foo')
222
225
223 def test_get_text_file_contents(self):
226 def test_get_text_file_contents(self):
224 for d, name in self.dirs_nbs:
227 for d, name in self.dirs_nbs:
225 model = self.api.read(u'%s.txt' % name, d+'/').json()
228 model = self.api.read(u'%s.txt' % name, d+'/').json()
226 self.assertEqual(model['name'], u'%s.txt' % name)
229 self.assertEqual(model['name'], u'%s.txt' % name)
227 self.assertIn('content', model)
230 self.assertIn('content', model)
228 self.assertEqual(model['format'], 'text')
231 self.assertEqual(model['format'], 'text')
229 self.assertEqual(model['type'], 'file')
232 self.assertEqual(model['type'], 'file')
230 self.assertEqual(model['content'], self._txt_for_name(name))
233 self.assertEqual(model['content'], self._txt_for_name(name))
231
234
232 # Name that doesn't exist - should be a 404
235 # Name that doesn't exist - should be a 404
233 with assert_http_error(404):
236 with assert_http_error(404):
234 self.api.read('q.txt', 'foo')
237 self.api.read('q.txt', 'foo')
235
238
236 def test_get_binary_file_contents(self):
239 def test_get_binary_file_contents(self):
237 for d, name in self.dirs_nbs:
240 for d, name in self.dirs_nbs:
238 model = self.api.read(u'%s.blob' % name, d+'/').json()
241 model = self.api.read(u'%s.blob' % name, d+'/').json()
239 self.assertEqual(model['name'], u'%s.blob' % name)
242 self.assertEqual(model['name'], u'%s.blob' % name)
240 self.assertIn('content', model)
243 self.assertIn('content', model)
241 self.assertEqual(model['format'], 'base64')
244 self.assertEqual(model['format'], 'base64')
242 self.assertEqual(model['type'], 'file')
245 self.assertEqual(model['type'], 'file')
243 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
246 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
244 self.assertEqual(model['content'], b64_data)
247 self.assertEqual(model['content'], b64_data)
245
248
246 # Name that doesn't exist - should be a 404
249 # Name that doesn't exist - should be a 404
247 with assert_http_error(404):
250 with assert_http_error(404):
248 self.api.read('q.txt', 'foo')
251 self.api.read('q.txt', 'foo')
249
252
250 def _check_created(self, resp, name, path, type='notebook'):
253 def _check_created(self, resp, name, path, type='notebook'):
251 self.assertEqual(resp.status_code, 201)
254 self.assertEqual(resp.status_code, 201)
252 location_header = py3compat.str_to_unicode(resp.headers['Location'])
255 location_header = py3compat.str_to_unicode(resp.headers['Location'])
253 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
256 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
254 rjson = resp.json()
257 rjson = resp.json()
255 self.assertEqual(rjson['name'], name)
258 self.assertEqual(rjson['name'], name)
256 self.assertEqual(rjson['path'], path)
259 self.assertEqual(rjson['path'], path)
257 self.assertEqual(rjson['type'], type)
260 self.assertEqual(rjson['type'], type)
258 isright = os.path.isdir if type == 'directory' else os.path.isfile
261 isright = os.path.isdir if type == 'directory' else os.path.isfile
259 assert isright(pjoin(
262 assert isright(pjoin(
260 self.notebook_dir.name,
263 self.notebook_dir.name,
261 path.replace('/', os.sep),
264 path.replace('/', os.sep),
262 name,
265 name,
263 ))
266 ))
264
267
265 def test_create_untitled(self):
268 def test_create_untitled(self):
266 resp = self.api.create_untitled(path=u'å b')
269 resp = self.api.create_untitled(path=u'å b')
267 self._check_created(resp, 'Untitled0.ipynb', u'å b')
270 self._check_created(resp, 'Untitled0.ipynb', u'å b')
268
271
269 # Second time
272 # Second time
270 resp = self.api.create_untitled(path=u'å b')
273 resp = self.api.create_untitled(path=u'å b')
271 self._check_created(resp, 'Untitled1.ipynb', u'å b')
274 self._check_created(resp, 'Untitled1.ipynb', u'å b')
272
275
273 # And two directories down
276 # And two directories down
274 resp = self.api.create_untitled(path='foo/bar')
277 resp = self.api.create_untitled(path='foo/bar')
275 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
278 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
276
279
277 def test_create_untitled_txt(self):
280 def test_create_untitled_txt(self):
278 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
281 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
279 self._check_created(resp, 'Untitled0.txt', 'foo/bar', type='file')
282 self._check_created(resp, 'Untitled0.txt', 'foo/bar', type='file')
280
283
281 resp = self.api.read(path='foo/bar', name='Untitled0.txt')
284 resp = self.api.read(path='foo/bar', name='Untitled0.txt')
282 model = resp.json()
285 model = resp.json()
283 self.assertEqual(model['type'], 'file')
286 self.assertEqual(model['type'], 'file')
284 self.assertEqual(model['format'], 'text')
287 self.assertEqual(model['format'], 'text')
285 self.assertEqual(model['content'], '')
288 self.assertEqual(model['content'], '')
286
289
287 def test_upload_untitled(self):
290 def test_upload_untitled(self):
288 nb = new_notebook(name='Upload test')
291 nb = new_notebook(name='Upload test')
289 nbmodel = {'content': nb, 'type': 'notebook'}
292 nbmodel = {'content': nb, 'type': 'notebook'}
290 resp = self.api.upload_untitled(path=u'å b',
293 resp = self.api.upload_untitled(path=u'å b',
291 body=json.dumps(nbmodel))
294 body=json.dumps(nbmodel))
292 self._check_created(resp, 'Untitled0.ipynb', u'å b')
295 self._check_created(resp, 'Untitled0.ipynb', u'å b')
293
296
294 def test_upload(self):
297 def test_upload(self):
295 nb = new_notebook(name=u'ignored')
298 nb = new_notebook(name=u'ignored')
296 nbmodel = {'content': nb, 'type': 'notebook'}
299 nbmodel = {'content': nb, 'type': 'notebook'}
297 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
300 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
298 body=json.dumps(nbmodel))
301 body=json.dumps(nbmodel))
299 self._check_created(resp, u'Upload tést.ipynb', u'å b')
302 self._check_created(resp, u'Upload tést.ipynb', u'å b')
300
303
301 def test_mkdir(self):
304 def test_mkdir(self):
302 model = {'type': 'directory'}
305 resp = self.api.mkdir(u'New ∂ir', path=u'å b')
303 resp = self.api.upload(u'New ∂ir', path=u'å b',
304 body=json.dumps(model))
305 self._check_created(resp, u'New ∂ir', u'å b', type='directory')
306 self._check_created(resp, u'New ∂ir', u'å b', type='directory')
306
307
307 def test_upload_txt(self):
308 def test_upload_txt(self):
308 body = u'ünicode téxt'
309 body = u'ünicode téxt'
309 model = {
310 model = {
310 'content' : body,
311 'content' : body,
311 'format' : 'text',
312 'format' : 'text',
312 'type' : 'file',
313 'type' : 'file',
313 }
314 }
314 resp = self.api.upload(u'Upload tést.txt', path=u'å b',
315 resp = self.api.upload(u'Upload tést.txt', path=u'å b',
315 body=json.dumps(model))
316 body=json.dumps(model))
316
317
317 # check roundtrip
318 # check roundtrip
318 resp = self.api.read(path=u'å b', name=u'Upload tést.txt')
319 resp = self.api.read(path=u'å b', name=u'Upload tést.txt')
319 model = resp.json()
320 model = resp.json()
320 self.assertEqual(model['type'], 'file')
321 self.assertEqual(model['type'], 'file')
321 self.assertEqual(model['format'], 'text')
322 self.assertEqual(model['format'], 'text')
322 self.assertEqual(model['content'], body)
323 self.assertEqual(model['content'], body)
323
324
324 def test_upload_b64(self):
325 def test_upload_b64(self):
325 body = b'\xFFblob'
326 body = b'\xFFblob'
326 b64body = base64.encodestring(body).decode('ascii')
327 b64body = base64.encodestring(body).decode('ascii')
327 model = {
328 model = {
328 'content' : b64body,
329 'content' : b64body,
329 'format' : 'base64',
330 'format' : 'base64',
330 'type' : 'file',
331 'type' : 'file',
331 }
332 }
332 resp = self.api.upload(u'Upload tést.blob', path=u'å b',
333 resp = self.api.upload(u'Upload tést.blob', path=u'å b',
333 body=json.dumps(model))
334 body=json.dumps(model))
334
335
335 # check roundtrip
336 # check roundtrip
336 resp = self.api.read(path=u'å b', name=u'Upload tést.blob')
337 resp = self.api.read(path=u'å b', name=u'Upload tést.blob')
337 model = resp.json()
338 model = resp.json()
338 self.assertEqual(model['type'], 'file')
339 self.assertEqual(model['type'], 'file')
339 self.assertEqual(model['format'], 'base64')
340 self.assertEqual(model['format'], 'base64')
340 decoded = base64.decodestring(model['content'].encode('ascii'))
341 decoded = base64.decodestring(model['content'].encode('ascii'))
341 self.assertEqual(decoded, body)
342 self.assertEqual(decoded, body)
342
343
343 def test_upload_v2(self):
344 def test_upload_v2(self):
344 nb = v2.new_notebook()
345 nb = v2.new_notebook()
345 ws = v2.new_worksheet()
346 ws = v2.new_worksheet()
346 nb.worksheets.append(ws)
347 nb.worksheets.append(ws)
347 ws.cells.append(v2.new_code_cell(input='print("hi")'))
348 ws.cells.append(v2.new_code_cell(input='print("hi")'))
348 nbmodel = {'content': nb, 'type': 'notebook'}
349 nbmodel = {'content': nb, 'type': 'notebook'}
349 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
350 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
350 body=json.dumps(nbmodel))
351 body=json.dumps(nbmodel))
351 self._check_created(resp, u'Upload tést.ipynb', u'å b')
352 self._check_created(resp, u'Upload tést.ipynb', u'å b')
352 resp = self.api.read(u'Upload tést.ipynb', u'å b')
353 resp = self.api.read(u'Upload tést.ipynb', u'å b')
353 data = resp.json()
354 data = resp.json()
354 self.assertEqual(data['content']['nbformat'], current.nbformat)
355 self.assertEqual(data['content']['nbformat'], current.nbformat)
355 self.assertEqual(data['content']['orig_nbformat'], 2)
356 self.assertEqual(data['content']['orig_nbformat'], 2)
356
357
357 def test_copy_untitled(self):
358 def test_copy_untitled(self):
358 resp = self.api.copy_untitled(u'ç d.ipynb', path=u'å b')
359 resp = self.api.copy_untitled(u'ç d.ipynb', path=u'å b')
359 self._check_created(resp, u'ç d-Copy0.ipynb', u'å b')
360 self._check_created(resp, u'ç d-Copy0.ipynb', u'å b')
360
361
361 def test_copy(self):
362 def test_copy(self):
362 resp = self.api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
363 resp = self.api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
363 self._check_created(resp, u'cøpy.ipynb', u'å b')
364 self._check_created(resp, u'cøpy.ipynb', u'å b')
364
365
366 def test_copy_dir_400(self):
367 # can't copy directories
368 with assert_http_error(400):
369 resp = self.api.copy(u'å b', u'å c')
370
365 def test_delete(self):
371 def test_delete(self):
366 for d, name in self.dirs_nbs:
372 for d, name in self.dirs_nbs:
367 resp = self.api.delete('%s.ipynb' % name, d)
373 resp = self.api.delete('%s.ipynb' % name, d)
368 self.assertEqual(resp.status_code, 204)
374 self.assertEqual(resp.status_code, 204)
369
375
370 for d in self.dirs + ['/']:
376 for d in self.dirs + ['/']:
371 nbs = notebooks_only(self.api.list(d).json())
377 nbs = notebooks_only(self.api.list(d).json())
372 self.assertEqual(len(nbs), 0)
378 self.assertEqual(len(nbs), 0)
373
379
380 def test_delete_dirs(self):
381 # depth-first delete everything, so we don't try to delete empty directories
382 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
383 listing = self.api.list(name).json()['content']
384 for model in listing:
385 self.api.delete(model['name'], model['path'])
386 listing = self.api.list('/').json()['content']
387 self.assertEqual(listing, [])
388
389 def test_delete_non_empty_dir(self):
390 """delete non-empty dir raises 400"""
391 with assert_http_error(400):
392 self.api.delete(u'å b')
393
374 def test_rename(self):
394 def test_rename(self):
375 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
395 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
376 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
396 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
377 self.assertEqual(resp.json()['name'], 'z.ipynb')
397 self.assertEqual(resp.json()['name'], 'z.ipynb')
378 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
398 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
379
399
380 nbs = notebooks_only(self.api.list('foo').json())
400 nbs = notebooks_only(self.api.list('foo').json())
381 nbnames = set(n['name'] for n in nbs)
401 nbnames = set(n['name'] for n in nbs)
382 self.assertIn('z.ipynb', nbnames)
402 self.assertIn('z.ipynb', nbnames)
383 self.assertNotIn('a.ipynb', nbnames)
403 self.assertNotIn('a.ipynb', nbnames)
384
404
385 def test_rename_existing(self):
405 def test_rename_existing(self):
386 with assert_http_error(409):
406 with assert_http_error(409):
387 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
407 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
388
408
389 def test_save(self):
409 def test_save(self):
390 resp = self.api.read('a.ipynb', 'foo')
410 resp = self.api.read('a.ipynb', 'foo')
391 nbcontent = json.loads(resp.text)['content']
411 nbcontent = json.loads(resp.text)['content']
392 nb = to_notebook_json(nbcontent)
412 nb = to_notebook_json(nbcontent)
393 ws = new_worksheet()
413 ws = new_worksheet()
394 nb.worksheets = [ws]
414 nb.worksheets = [ws]
395 ws.cells.append(new_heading_cell(u'Created by test ³'))
415 ws.cells.append(new_heading_cell(u'Created by test ³'))
396
416
397 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
417 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
398 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
418 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
399
419
400 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
420 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
401 with io.open(nbfile, 'r', encoding='utf-8') as f:
421 with io.open(nbfile, 'r', encoding='utf-8') as f:
402 newnb = read(f, format='ipynb')
422 newnb = read(f, format='ipynb')
403 self.assertEqual(newnb.worksheets[0].cells[0].source,
423 self.assertEqual(newnb.worksheets[0].cells[0].source,
404 u'Created by test ³')
424 u'Created by test ³')
405 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
425 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
406 newnb = to_notebook_json(nbcontent)
426 newnb = to_notebook_json(nbcontent)
407 self.assertEqual(newnb.worksheets[0].cells[0].source,
427 self.assertEqual(newnb.worksheets[0].cells[0].source,
408 u'Created by test ³')
428 u'Created by test ³')
409
429
410 # Save and rename
430 # Save and rename
411 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
431 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
412 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
432 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
413 saved = resp.json()
433 saved = resp.json()
414 self.assertEqual(saved['name'], 'a2.ipynb')
434 self.assertEqual(saved['name'], 'a2.ipynb')
415 self.assertEqual(saved['path'], 'foo/bar')
435 self.assertEqual(saved['path'], 'foo/bar')
416 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
436 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
417 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
437 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
418 with assert_http_error(404):
438 with assert_http_error(404):
419 self.api.read('a.ipynb', 'foo')
439 self.api.read('a.ipynb', 'foo')
420
440
421 def test_checkpoints(self):
441 def test_checkpoints(self):
422 resp = self.api.read('a.ipynb', 'foo')
442 resp = self.api.read('a.ipynb', 'foo')
423 r = self.api.new_checkpoint('a.ipynb', 'foo')
443 r = self.api.new_checkpoint('a.ipynb', 'foo')
424 self.assertEqual(r.status_code, 201)
444 self.assertEqual(r.status_code, 201)
425 cp1 = r.json()
445 cp1 = r.json()
426 self.assertEqual(set(cp1), {'id', 'last_modified'})
446 self.assertEqual(set(cp1), {'id', 'last_modified'})
427 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
447 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
428
448
429 # Modify it
449 # Modify it
430 nbcontent = json.loads(resp.text)['content']
450 nbcontent = json.loads(resp.text)['content']
431 nb = to_notebook_json(nbcontent)
451 nb = to_notebook_json(nbcontent)
432 ws = new_worksheet()
452 ws = new_worksheet()
433 nb.worksheets = [ws]
453 nb.worksheets = [ws]
434 hcell = new_heading_cell('Created by test')
454 hcell = new_heading_cell('Created by test')
435 ws.cells.append(hcell)
455 ws.cells.append(hcell)
436 # Save
456 # Save
437 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
457 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
438 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
458 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
439
459
440 # List checkpoints
460 # List checkpoints
441 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
461 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
442 self.assertEqual(cps, [cp1])
462 self.assertEqual(cps, [cp1])
443
463
444 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
464 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
445 nb = to_notebook_json(nbcontent)
465 nb = to_notebook_json(nbcontent)
446 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
466 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
447
467
448 # Restore cp1
468 # Restore cp1
449 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
469 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
450 self.assertEqual(r.status_code, 204)
470 self.assertEqual(r.status_code, 204)
451 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
471 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
452 nb = to_notebook_json(nbcontent)
472 nb = to_notebook_json(nbcontent)
453 self.assertEqual(nb.worksheets, [])
473 self.assertEqual(nb.worksheets, [])
454
474
455 # Delete cp1
475 # Delete cp1
456 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
476 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
457 self.assertEqual(r.status_code, 204)
477 self.assertEqual(r.status_code, 204)
458 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
478 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
459 self.assertEqual(cps, [])
479 self.assertEqual(cps, [])
General Comments 0
You need to be logged in to leave comments. Login now