##// END OF EJS Templates
Merge pull request #6915 from minrk/contents-no-0...
Thomas Kluyver -
r18835:b7c85f43 merge
parent child Browse files
Show More
@@ -1,373 +1,380 b''
1 """A base class for contents managers."""
1 """A base class for contents managers."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from fnmatch import fnmatch
6 from fnmatch import fnmatch
7 import itertools
7 import itertools
8 import json
8 import json
9 import os
9 import os
10 import re
10
11
11 from tornado.web import HTTPError
12 from tornado.web import HTTPError
12
13
13 from IPython.config.configurable import LoggingConfigurable
14 from IPython.config.configurable import LoggingConfigurable
14 from IPython.nbformat import sign, validate, ValidationError
15 from IPython.nbformat import sign, validate, ValidationError
15 from IPython.nbformat.v4 import new_notebook
16 from IPython.nbformat.v4 import new_notebook
16 from IPython.utils.traitlets import Instance, Unicode, List
17 from IPython.utils.traitlets import Instance, Unicode, List
17
18
19 copy_pat = re.compile(r'\-Copy\d*\.')
18
20
19 class ContentsManager(LoggingConfigurable):
21 class ContentsManager(LoggingConfigurable):
20 """Base class for serving files and directories.
22 """Base class for serving files and directories.
21
23
22 This serves any text or binary file,
24 This serves any text or binary file,
23 as well as directories,
25 as well as directories,
24 with special handling for JSON notebook documents.
26 with special handling for JSON notebook documents.
25
27
26 Most APIs take a path argument,
28 Most APIs take a path argument,
27 which is always an API-style unicode path,
29 which is always an API-style unicode path,
28 and always refers to a directory.
30 and always refers to a directory.
29
31
30 - unicode, not url-escaped
32 - unicode, not url-escaped
31 - '/'-separated
33 - '/'-separated
32 - leading and trailing '/' will be stripped
34 - leading and trailing '/' will be stripped
33 - if unspecified, path defaults to '',
35 - if unspecified, path defaults to '',
34 indicating the root path.
36 indicating the root path.
35
37
36 """
38 """
37
39
38 notary = Instance(sign.NotebookNotary)
40 notary = Instance(sign.NotebookNotary)
39 def _notary_default(self):
41 def _notary_default(self):
40 return sign.NotebookNotary(parent=self)
42 return sign.NotebookNotary(parent=self)
41
43
42 hide_globs = List(Unicode, [
44 hide_globs = List(Unicode, [
43 u'__pycache__', '*.pyc', '*.pyo',
45 u'__pycache__', '*.pyc', '*.pyo',
44 '.DS_Store', '*.so', '*.dylib', '*~',
46 '.DS_Store', '*.so', '*.dylib', '*~',
45 ], config=True, help="""
47 ], config=True, help="""
46 Glob patterns to hide in file and directory listings.
48 Glob patterns to hide in file and directory listings.
47 """)
49 """)
48
50
49 untitled_notebook = Unicode("Untitled", config=True,
51 untitled_notebook = Unicode("Untitled", config=True,
50 help="The base name used when creating untitled notebooks."
52 help="The base name used when creating untitled notebooks."
51 )
53 )
52
54
53 untitled_file = Unicode("untitled", config=True,
55 untitled_file = Unicode("untitled", config=True,
54 help="The base name used when creating untitled files."
56 help="The base name used when creating untitled files."
55 )
57 )
56
58
57 untitled_directory = Unicode("Untitled Folder", config=True,
59 untitled_directory = Unicode("Untitled Folder", config=True,
58 help="The base name used when creating untitled directories."
60 help="The base name used when creating untitled directories."
59 )
61 )
60
62
61 # ContentsManager API part 1: methods that must be
63 # ContentsManager API part 1: methods that must be
62 # implemented in subclasses.
64 # implemented in subclasses.
63
65
64 def dir_exists(self, path):
66 def dir_exists(self, path):
65 """Does the API-style path (directory) actually exist?
67 """Does the API-style path (directory) actually exist?
66
68
67 Like os.path.isdir
69 Like os.path.isdir
68
70
69 Override this method in subclasses.
71 Override this method in subclasses.
70
72
71 Parameters
73 Parameters
72 ----------
74 ----------
73 path : string
75 path : string
74 The path to check
76 The path to check
75
77
76 Returns
78 Returns
77 -------
79 -------
78 exists : bool
80 exists : bool
79 Whether the path does indeed exist.
81 Whether the path does indeed exist.
80 """
82 """
81 raise NotImplementedError
83 raise NotImplementedError
82
84
83 def is_hidden(self, path):
85 def is_hidden(self, path):
84 """Does the API style path correspond to a hidden directory or file?
86 """Does the API style path correspond to a hidden directory or file?
85
87
86 Parameters
88 Parameters
87 ----------
89 ----------
88 path : string
90 path : string
89 The path to check. This is an API path (`/` separated,
91 The path to check. This is an API path (`/` separated,
90 relative to root dir).
92 relative to root dir).
91
93
92 Returns
94 Returns
93 -------
95 -------
94 hidden : bool
96 hidden : bool
95 Whether the path is hidden.
97 Whether the path is hidden.
96
98
97 """
99 """
98 raise NotImplementedError
100 raise NotImplementedError
99
101
100 def file_exists(self, path=''):
102 def file_exists(self, path=''):
101 """Does a file exist at the given path?
103 """Does a file exist at the given path?
102
104
103 Like os.path.isfile
105 Like os.path.isfile
104
106
105 Override this method in subclasses.
107 Override this method in subclasses.
106
108
107 Parameters
109 Parameters
108 ----------
110 ----------
109 name : string
111 name : string
110 The name of the file you are checking.
112 The name of the file you are checking.
111 path : string
113 path : string
112 The relative path to the file's directory (with '/' as separator)
114 The relative path to the file's directory (with '/' as separator)
113
115
114 Returns
116 Returns
115 -------
117 -------
116 exists : bool
118 exists : bool
117 Whether the file exists.
119 Whether the file exists.
118 """
120 """
119 raise NotImplementedError('must be implemented in a subclass')
121 raise NotImplementedError('must be implemented in a subclass')
120
122
121 def exists(self, path):
123 def exists(self, path):
122 """Does a file or directory exist at the given path?
124 """Does a file or directory exist at the given path?
123
125
124 Like os.path.exists
126 Like os.path.exists
125
127
126 Parameters
128 Parameters
127 ----------
129 ----------
128 path : string
130 path : string
129 The relative path to the file's directory (with '/' as separator)
131 The relative path to the file's directory (with '/' as separator)
130
132
131 Returns
133 Returns
132 -------
134 -------
133 exists : bool
135 exists : bool
134 Whether the target exists.
136 Whether the target exists.
135 """
137 """
136 return self.file_exists(path) or self.dir_exists(path)
138 return self.file_exists(path) or self.dir_exists(path)
137
139
138 def get(self, path, content=True, type_=None, format=None):
140 def get(self, path, content=True, type_=None, format=None):
139 """Get the model of a file or directory with or without content."""
141 """Get the model of a file or directory with or without content."""
140 raise NotImplementedError('must be implemented in a subclass')
142 raise NotImplementedError('must be implemented in a subclass')
141
143
142 def save(self, model, path):
144 def save(self, model, path):
143 """Save the file or directory and return the model with no content."""
145 """Save the file or directory and return the model with no content."""
144 raise NotImplementedError('must be implemented in a subclass')
146 raise NotImplementedError('must be implemented in a subclass')
145
147
146 def update(self, model, path):
148 def update(self, model, path):
147 """Update the file or directory and return the model with no content.
149 """Update the file or directory and return the model with no content.
148
150
149 For use in PATCH requests, to enable renaming a file without
151 For use in PATCH requests, to enable renaming a file without
150 re-uploading its contents. Only used for renaming at the moment.
152 re-uploading its contents. Only used for renaming at the moment.
151 """
153 """
152 raise NotImplementedError('must be implemented in a subclass')
154 raise NotImplementedError('must be implemented in a subclass')
153
155
154 def delete(self, path):
156 def delete(self, path):
155 """Delete file or directory by path."""
157 """Delete file or directory by path."""
156 raise NotImplementedError('must be implemented in a subclass')
158 raise NotImplementedError('must be implemented in a subclass')
157
159
158 def create_checkpoint(self, path):
160 def create_checkpoint(self, path):
159 """Create a checkpoint of the current state of a file
161 """Create a checkpoint of the current state of a file
160
162
161 Returns a checkpoint_id for the new checkpoint.
163 Returns a checkpoint_id for the new checkpoint.
162 """
164 """
163 raise NotImplementedError("must be implemented in a subclass")
165 raise NotImplementedError("must be implemented in a subclass")
164
166
165 def list_checkpoints(self, path):
167 def list_checkpoints(self, path):
166 """Return a list of checkpoints for a given file"""
168 """Return a list of checkpoints for a given file"""
167 return []
169 return []
168
170
169 def restore_checkpoint(self, checkpoint_id, path):
171 def restore_checkpoint(self, checkpoint_id, path):
170 """Restore a file from one of its checkpoints"""
172 """Restore a file from one of its checkpoints"""
171 raise NotImplementedError("must be implemented in a subclass")
173 raise NotImplementedError("must be implemented in a subclass")
172
174
173 def delete_checkpoint(self, checkpoint_id, path):
175 def delete_checkpoint(self, checkpoint_id, path):
174 """delete a checkpoint for a file"""
176 """delete a checkpoint for a file"""
175 raise NotImplementedError("must be implemented in a subclass")
177 raise NotImplementedError("must be implemented in a subclass")
176
178
177 # ContentsManager API part 2: methods that have useable default
179 # ContentsManager API part 2: methods that have useable default
178 # implementations, but can be overridden in subclasses.
180 # implementations, but can be overridden in subclasses.
179
181
180 def info_string(self):
182 def info_string(self):
181 return "Serving contents"
183 return "Serving contents"
182
184
183 def get_kernel_path(self, path, model=None):
185 def get_kernel_path(self, path, model=None):
184 """Return the API path for the kernel
186 """Return the API path for the kernel
185
187
186 KernelManagers can turn this value into a filesystem path,
188 KernelManagers can turn this value into a filesystem path,
187 or ignore it altogether.
189 or ignore it altogether.
188 """
190 """
189 return path
191 return path
190
192
191 def increment_filename(self, filename, path=''):
193 def increment_filename(self, filename, path='', insert=''):
192 """Increment a filename until it is unique.
194 """Increment a filename until it is unique.
193
195
194 Parameters
196 Parameters
195 ----------
197 ----------
196 filename : unicode
198 filename : unicode
197 The name of a file, including extension
199 The name of a file, including extension
198 path : unicode
200 path : unicode
199 The API path of the target's directory
201 The API path of the target's directory
200
202
201 Returns
203 Returns
202 -------
204 -------
203 name : unicode
205 name : unicode
204 A filename that is unique, based on the input filename.
206 A filename that is unique, based on the input filename.
205 """
207 """
206 path = path.strip('/')
208 path = path.strip('/')
207 basename, ext = os.path.splitext(filename)
209 basename, ext = os.path.splitext(filename)
208 for i in itertools.count():
210 for i in itertools.count():
209 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
211 if i:
210 ext=ext)
212 insert_i = '{}{}'.format(insert, i)
213 else:
214 insert_i = ''
215 name = u'{basename}{insert}{ext}'.format(basename=basename,
216 insert=insert_i, ext=ext)
211 if not self.exists(u'{}/{}'.format(path, name)):
217 if not self.exists(u'{}/{}'.format(path, name)):
212 break
218 break
213 return name
219 return name
214
220
215 def validate_notebook_model(self, model):
221 def validate_notebook_model(self, model):
216 """Add failed-validation message to model"""
222 """Add failed-validation message to model"""
217 try:
223 try:
218 validate(model['content'])
224 validate(model['content'])
219 except ValidationError as e:
225 except ValidationError as e:
220 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
226 model['message'] = u'Notebook Validation failed: {}:\n{}'.format(
221 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
227 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
222 )
228 )
223 return model
229 return model
224
230
225 def new_untitled(self, path='', type='', ext=''):
231 def new_untitled(self, path='', type='', ext=''):
226 """Create a new untitled file or directory in path
232 """Create a new untitled file or directory in path
227
233
228 path must be a directory
234 path must be a directory
229
235
230 File extension can be specified.
236 File extension can be specified.
231
237
232 Use `new` to create files with a fully specified path (including filename).
238 Use `new` to create files with a fully specified path (including filename).
233 """
239 """
234 path = path.strip('/')
240 path = path.strip('/')
235 if not self.dir_exists(path):
241 if not self.dir_exists(path):
236 raise HTTPError(404, 'No such directory: %s' % path)
242 raise HTTPError(404, 'No such directory: %s' % path)
237
243
238 model = {}
244 model = {}
239 if type:
245 if type:
240 model['type'] = type
246 model['type'] = type
241
247
242 if ext == '.ipynb':
248 if ext == '.ipynb':
243 model.setdefault('type', 'notebook')
249 model.setdefault('type', 'notebook')
244 else:
250 else:
245 model.setdefault('type', 'file')
251 model.setdefault('type', 'file')
246
252
253 insert = ''
247 if model['type'] == 'directory':
254 if model['type'] == 'directory':
248 untitled = self.untitled_directory
255 untitled = self.untitled_directory
256 insert = ' '
249 elif model['type'] == 'notebook':
257 elif model['type'] == 'notebook':
250 untitled = self.untitled_notebook
258 untitled = self.untitled_notebook
251 ext = '.ipynb'
259 ext = '.ipynb'
252 elif model['type'] == 'file':
260 elif model['type'] == 'file':
253 untitled = self.untitled_file
261 untitled = self.untitled_file
254 else:
262 else:
255 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
263 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
256
264
257 name = self.increment_filename(untitled + ext, path)
265 name = self.increment_filename(untitled + ext, path, insert=insert)
258 path = u'{0}/{1}'.format(path, name)
266 path = u'{0}/{1}'.format(path, name)
259 return self.new(model, path)
267 return self.new(model, path)
260
268
261 def new(self, model=None, path=''):
269 def new(self, model=None, path=''):
262 """Create a new file or directory and return its model with no content.
270 """Create a new file or directory and return its model with no content.
263
271
264 To create a new untitled entity in a directory, use `new_untitled`.
272 To create a new untitled entity in a directory, use `new_untitled`.
265 """
273 """
266 path = path.strip('/')
274 path = path.strip('/')
267 if model is None:
275 if model is None:
268 model = {}
276 model = {}
269
277
270 if path.endswith('.ipynb'):
278 if path.endswith('.ipynb'):
271 model.setdefault('type', 'notebook')
279 model.setdefault('type', 'notebook')
272 else:
280 else:
273 model.setdefault('type', 'file')
281 model.setdefault('type', 'file')
274
282
275 # no content, not a directory, so fill out new-file model
283 # no content, not a directory, so fill out new-file model
276 if 'content' not in model and model['type'] != 'directory':
284 if 'content' not in model and model['type'] != 'directory':
277 if model['type'] == 'notebook':
285 if model['type'] == 'notebook':
278 model['content'] = new_notebook()
286 model['content'] = new_notebook()
279 model['format'] = 'json'
287 model['format'] = 'json'
280 else:
288 else:
281 model['content'] = ''
289 model['content'] = ''
282 model['type'] = 'file'
290 model['type'] = 'file'
283 model['format'] = 'text'
291 model['format'] = 'text'
284
292
285 model = self.save(model, path)
293 model = self.save(model, path)
286 return model
294 return model
287
295
288 def copy(self, from_path, to_path=None):
296 def copy(self, from_path, to_path=None):
289 """Copy an existing file and return its new model.
297 """Copy an existing file and return its new model.
290
298
291 If to_path not specified, it will be the parent directory of from_path.
299 If to_path not specified, it will be the parent directory of from_path.
292 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
300 If to_path is a directory, filename will increment `from_path-Copy#.ext`.
293
301
294 from_path must be a full path to a file.
302 from_path must be a full path to a file.
295 """
303 """
296 path = from_path.strip('/')
304 path = from_path.strip('/')
297 if '/' in path:
305 if '/' in path:
298 from_dir, from_name = path.rsplit('/', 1)
306 from_dir, from_name = path.rsplit('/', 1)
299 else:
307 else:
300 from_dir = ''
308 from_dir = ''
301 from_name = path
309 from_name = path
302
310
303 model = self.get(path)
311 model = self.get(path)
304 model.pop('path', None)
312 model.pop('path', None)
305 model.pop('name', None)
313 model.pop('name', None)
306 if model['type'] == 'directory':
314 if model['type'] == 'directory':
307 raise HTTPError(400, "Can't copy directories")
315 raise HTTPError(400, "Can't copy directories")
308
316
309 if not to_path:
317 if not to_path:
310 to_path = from_dir
318 to_path = from_dir
311 if self.dir_exists(to_path):
319 if self.dir_exists(to_path):
312 base, ext = os.path.splitext(from_name)
320 name = copy_pat.sub(u'.', from_name)
313 copy_name = u'{0}-Copy{1}'.format(base, ext)
321 to_name = self.increment_filename(name, to_path, insert='-Copy')
314 to_name = self.increment_filename(copy_name, to_path)
315 to_path = u'{0}/{1}'.format(to_path, to_name)
322 to_path = u'{0}/{1}'.format(to_path, to_name)
316
323
317 model = self.save(model, to_path)
324 model = self.save(model, to_path)
318 return model
325 return model
319
326
320 def log_info(self):
327 def log_info(self):
321 self.log.info(self.info_string())
328 self.log.info(self.info_string())
322
329
323 def trust_notebook(self, path):
330 def trust_notebook(self, path):
324 """Explicitly trust a notebook
331 """Explicitly trust a notebook
325
332
326 Parameters
333 Parameters
327 ----------
334 ----------
328 path : string
335 path : string
329 The path of a notebook
336 The path of a notebook
330 """
337 """
331 model = self.get(path)
338 model = self.get(path)
332 nb = model['content']
339 nb = model['content']
333 self.log.warn("Trusting notebook %s", path)
340 self.log.warn("Trusting notebook %s", path)
334 self.notary.mark_cells(nb, True)
341 self.notary.mark_cells(nb, True)
335 self.save(model, path)
342 self.save(model, path)
336
343
337 def check_and_sign(self, nb, path=''):
344 def check_and_sign(self, nb, path=''):
338 """Check for trusted cells, and sign the notebook.
345 """Check for trusted cells, and sign the notebook.
339
346
340 Called as a part of saving notebooks.
347 Called as a part of saving notebooks.
341
348
342 Parameters
349 Parameters
343 ----------
350 ----------
344 nb : dict
351 nb : dict
345 The notebook dict
352 The notebook dict
346 path : string
353 path : string
347 The notebook's path (for logging)
354 The notebook's path (for logging)
348 """
355 """
349 if self.notary.check_cells(nb):
356 if self.notary.check_cells(nb):
350 self.notary.sign(nb)
357 self.notary.sign(nb)
351 else:
358 else:
352 self.log.warn("Saving untrusted notebook %s", path)
359 self.log.warn("Saving untrusted notebook %s", path)
353
360
354 def mark_trusted_cells(self, nb, path=''):
361 def mark_trusted_cells(self, nb, path=''):
355 """Mark cells as trusted if the notebook signature matches.
362 """Mark cells as trusted if the notebook signature matches.
356
363
357 Called as a part of loading notebooks.
364 Called as a part of loading notebooks.
358
365
359 Parameters
366 Parameters
360 ----------
367 ----------
361 nb : dict
368 nb : dict
362 The notebook object (in current nbformat)
369 The notebook object (in current nbformat)
363 path : string
370 path : string
364 The notebook's path (for logging)
371 The notebook's path (for logging)
365 """
372 """
366 trusted = self.notary.check_signature(nb)
373 trusted = self.notary.check_signature(nb)
367 if not trusted:
374 if not trusted:
368 self.log.warn("Notebook %s is not trusted", path)
375 self.log.warn("Notebook %s is not trusted", path)
369 self.notary.mark_cells(nb, trusted)
376 self.notary.mark_cells(nb, trusted)
370
377
371 def should_list(self, name):
378 def should_list(self, name):
372 """Should this file/directory name be displayed in a listing?"""
379 """Should this file/directory name be displayed in a listing?"""
373 return not any(fnmatch(name, glob) for glob in self.hide_globs)
380 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,511 +1,521 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 import io
5 import io
6 import json
6 import json
7 import os
7 import os
8 import shutil
8 import shutil
9 from unicodedata import normalize
9 from unicodedata import normalize
10
10
11 pjoin = os.path.join
11 pjoin = os.path.join
12
12
13 import requests
13 import requests
14
14
15 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.utils import url_path_join, url_escape
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import read, write, from_dict
17 from IPython.nbformat import read, write, from_dict
18 from IPython.nbformat.v4 import (
18 from IPython.nbformat.v4 import (
19 new_notebook, new_markdown_cell,
19 new_notebook, new_markdown_cell,
20 )
20 )
21 from IPython.nbformat import v2
21 from IPython.nbformat import v2
22 from IPython.utils import py3compat
22 from IPython.utils import py3compat
23 from IPython.utils.data import uniq_stable
23 from IPython.utils.data import uniq_stable
24
24
25
25
26 def notebooks_only(dir_model):
26 def notebooks_only(dir_model):
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28
28
29 def dirs_only(dir_model):
29 def dirs_only(dir_model):
30 return [x for x in dir_model['content'] if x['type']=='directory']
30 return [x for x in dir_model['content'] if x['type']=='directory']
31
31
32
32
33 class API(object):
33 class API(object):
34 """Wrapper for contents API calls."""
34 """Wrapper for contents API calls."""
35 def __init__(self, base_url):
35 def __init__(self, base_url):
36 self.base_url = base_url
36 self.base_url = base_url
37
37
38 def _req(self, verb, path, body=None, params=None):
38 def _req(self, verb, path, body=None, params=None):
39 response = requests.request(verb,
39 response = requests.request(verb,
40 url_path_join(self.base_url, 'api/contents', path),
40 url_path_join(self.base_url, 'api/contents', path),
41 data=body, params=params,
41 data=body, params=params,
42 )
42 )
43 response.raise_for_status()
43 response.raise_for_status()
44 return response
44 return response
45
45
46 def list(self, path='/'):
46 def list(self, path='/'):
47 return self._req('GET', path)
47 return self._req('GET', path)
48
48
49 def read(self, path, type_=None, format=None):
49 def read(self, path, type_=None, format=None):
50 params = {}
50 params = {}
51 if type_ is not None:
51 if type_ is not None:
52 params['type'] = type_
52 params['type'] = type_
53 if format is not None:
53 if format is not None:
54 params['format'] = format
54 params['format'] = format
55 return self._req('GET', path, params=params)
55 return self._req('GET', path, params=params)
56
56
57 def create_untitled(self, path='/', ext='.ipynb'):
57 def create_untitled(self, path='/', ext='.ipynb'):
58 body = None
58 body = None
59 if ext:
59 if ext:
60 body = json.dumps({'ext': ext})
60 body = json.dumps({'ext': ext})
61 return self._req('POST', path, body)
61 return self._req('POST', path, body)
62
62
63 def mkdir_untitled(self, path='/'):
63 def mkdir_untitled(self, path='/'):
64 return self._req('POST', path, json.dumps({'type': 'directory'}))
64 return self._req('POST', path, json.dumps({'type': 'directory'}))
65
65
66 def copy(self, copy_from, path='/'):
66 def copy(self, copy_from, path='/'):
67 body = json.dumps({'copy_from':copy_from})
67 body = json.dumps({'copy_from':copy_from})
68 return self._req('POST', path, body)
68 return self._req('POST', path, body)
69
69
70 def create(self, path='/'):
70 def create(self, path='/'):
71 return self._req('PUT', path)
71 return self._req('PUT', path)
72
72
73 def upload(self, path, body):
73 def upload(self, path, body):
74 return self._req('PUT', path, body)
74 return self._req('PUT', path, body)
75
75
76 def mkdir_untitled(self, path='/'):
76 def mkdir_untitled(self, path='/'):
77 return self._req('POST', path, json.dumps({'type': 'directory'}))
77 return self._req('POST', path, json.dumps({'type': 'directory'}))
78
78
79 def mkdir(self, path='/'):
79 def mkdir(self, path='/'):
80 return self._req('PUT', path, json.dumps({'type': 'directory'}))
80 return self._req('PUT', path, json.dumps({'type': 'directory'}))
81
81
82 def copy_put(self, copy_from, path='/'):
82 def copy_put(self, copy_from, path='/'):
83 body = json.dumps({'copy_from':copy_from})
83 body = json.dumps({'copy_from':copy_from})
84 return self._req('PUT', path, body)
84 return self._req('PUT', path, body)
85
85
86 def save(self, path, body):
86 def save(self, path, body):
87 return self._req('PUT', path, body)
87 return self._req('PUT', path, body)
88
88
89 def delete(self, path='/'):
89 def delete(self, path='/'):
90 return self._req('DELETE', path)
90 return self._req('DELETE', path)
91
91
92 def rename(self, path, new_path):
92 def rename(self, path, new_path):
93 body = json.dumps({'path': new_path})
93 body = json.dumps({'path': new_path})
94 return self._req('PATCH', path, body)
94 return self._req('PATCH', path, body)
95
95
96 def get_checkpoints(self, path):
96 def get_checkpoints(self, path):
97 return self._req('GET', url_path_join(path, 'checkpoints'))
97 return self._req('GET', url_path_join(path, 'checkpoints'))
98
98
99 def new_checkpoint(self, path):
99 def new_checkpoint(self, path):
100 return self._req('POST', url_path_join(path, 'checkpoints'))
100 return self._req('POST', url_path_join(path, 'checkpoints'))
101
101
102 def restore_checkpoint(self, path, checkpoint_id):
102 def restore_checkpoint(self, path, checkpoint_id):
103 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
103 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
104
104
105 def delete_checkpoint(self, path, checkpoint_id):
105 def delete_checkpoint(self, path, checkpoint_id):
106 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
106 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
107
107
108 class APITest(NotebookTestBase):
108 class APITest(NotebookTestBase):
109 """Test the kernels web service API"""
109 """Test the kernels web service API"""
110 dirs_nbs = [('', 'inroot'),
110 dirs_nbs = [('', 'inroot'),
111 ('Directory with spaces in', 'inspace'),
111 ('Directory with spaces in', 'inspace'),
112 (u'unicodΓ©', 'innonascii'),
112 (u'unicodΓ©', 'innonascii'),
113 ('foo', 'a'),
113 ('foo', 'a'),
114 ('foo', 'b'),
114 ('foo', 'b'),
115 ('foo', 'name with spaces'),
115 ('foo', 'name with spaces'),
116 ('foo', u'unicodΓ©'),
116 ('foo', u'unicodΓ©'),
117 ('foo/bar', 'baz'),
117 ('foo/bar', 'baz'),
118 ('ordering', 'A'),
118 ('ordering', 'A'),
119 ('ordering', 'b'),
119 ('ordering', 'b'),
120 ('ordering', 'C'),
120 ('ordering', 'C'),
121 (u'Γ₯ b', u'Γ§ d'),
121 (u'Γ₯ b', u'Γ§ d'),
122 ]
122 ]
123 hidden_dirs = ['.hidden', '__pycache__']
123 hidden_dirs = ['.hidden', '__pycache__']
124
124
125 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
125 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
126 del dirs[0] # remove ''
126 del dirs[0] # remove ''
127 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
127 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
128
128
129 @staticmethod
129 @staticmethod
130 def _blob_for_name(name):
130 def _blob_for_name(name):
131 return name.encode('utf-8') + b'\xFF'
131 return name.encode('utf-8') + b'\xFF'
132
132
133 @staticmethod
133 @staticmethod
134 def _txt_for_name(name):
134 def _txt_for_name(name):
135 return u'%s text file' % name
135 return u'%s text file' % name
136
136
137 def setUp(self):
137 def setUp(self):
138 nbdir = self.notebook_dir.name
138 nbdir = self.notebook_dir.name
139 self.blob = os.urandom(100)
139 self.blob = os.urandom(100)
140 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
140 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
141
141
142 for d in (self.dirs + self.hidden_dirs):
142 for d in (self.dirs + self.hidden_dirs):
143 d.replace('/', os.sep)
143 d.replace('/', os.sep)
144 if not os.path.isdir(pjoin(nbdir, d)):
144 if not os.path.isdir(pjoin(nbdir, d)):
145 os.mkdir(pjoin(nbdir, d))
145 os.mkdir(pjoin(nbdir, d))
146
146
147 for d, name in self.dirs_nbs:
147 for d, name in self.dirs_nbs:
148 d = d.replace('/', os.sep)
148 d = d.replace('/', os.sep)
149 # create a notebook
149 # create a notebook
150 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
150 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
151 encoding='utf-8') as f:
151 encoding='utf-8') as f:
152 nb = new_notebook()
152 nb = new_notebook()
153 write(nb, f, version=4)
153 write(nb, f, version=4)
154
154
155 # create a text file
155 # create a text file
156 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
156 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
157 encoding='utf-8') as f:
157 encoding='utf-8') as f:
158 f.write(self._txt_for_name(name))
158 f.write(self._txt_for_name(name))
159
159
160 # create a binary file
160 # create a binary file
161 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
161 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
162 f.write(self._blob_for_name(name))
162 f.write(self._blob_for_name(name))
163
163
164 self.api = API(self.base_url())
164 self.api = API(self.base_url())
165
165
166 def tearDown(self):
166 def tearDown(self):
167 nbdir = self.notebook_dir.name
167 nbdir = self.notebook_dir.name
168
168
169 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
169 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
170 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
170 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
171
171
172 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
172 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
173 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
173 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
174
174
175 def test_list_notebooks(self):
175 def test_list_notebooks(self):
176 nbs = notebooks_only(self.api.list().json())
176 nbs = notebooks_only(self.api.list().json())
177 self.assertEqual(len(nbs), 1)
177 self.assertEqual(len(nbs), 1)
178 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
178 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
179
179
180 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
180 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
181 self.assertEqual(len(nbs), 1)
181 self.assertEqual(len(nbs), 1)
182 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
182 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
183
183
184 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
184 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
185 self.assertEqual(len(nbs), 1)
185 self.assertEqual(len(nbs), 1)
186 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
186 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
187 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
187 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
188
188
189 nbs = notebooks_only(self.api.list('/foo/bar/').json())
189 nbs = notebooks_only(self.api.list('/foo/bar/').json())
190 self.assertEqual(len(nbs), 1)
190 self.assertEqual(len(nbs), 1)
191 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
191 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
192 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
192 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
193
193
194 nbs = notebooks_only(self.api.list('foo').json())
194 nbs = notebooks_only(self.api.list('foo').json())
195 self.assertEqual(len(nbs), 4)
195 self.assertEqual(len(nbs), 4)
196 nbnames = { normalize('NFC', n['name']) for n in nbs }
196 nbnames = { normalize('NFC', n['name']) for n in nbs }
197 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
197 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
198 expected = { normalize('NFC', name) for name in expected }
198 expected = { normalize('NFC', name) for name in expected }
199 self.assertEqual(nbnames, expected)
199 self.assertEqual(nbnames, expected)
200
200
201 nbs = notebooks_only(self.api.list('ordering').json())
201 nbs = notebooks_only(self.api.list('ordering').json())
202 nbnames = [n['name'] for n in nbs]
202 nbnames = [n['name'] for n in nbs]
203 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
203 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
204 self.assertEqual(nbnames, expected)
204 self.assertEqual(nbnames, expected)
205
205
206 def test_list_dirs(self):
206 def test_list_dirs(self):
207 print(self.api.list().json())
207 print(self.api.list().json())
208 dirs = dirs_only(self.api.list().json())
208 dirs = dirs_only(self.api.list().json())
209 dir_names = {normalize('NFC', d['name']) for d in dirs}
209 dir_names = {normalize('NFC', d['name']) for d in dirs}
210 print(dir_names)
210 print(dir_names)
211 print(self.top_level_dirs)
211 print(self.top_level_dirs)
212 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
212 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
213
213
214 def test_list_nonexistant_dir(self):
214 def test_list_nonexistant_dir(self):
215 with assert_http_error(404):
215 with assert_http_error(404):
216 self.api.list('nonexistant')
216 self.api.list('nonexistant')
217
217
218 def test_get_nb_contents(self):
218 def test_get_nb_contents(self):
219 for d, name in self.dirs_nbs:
219 for d, name in self.dirs_nbs:
220 path = url_path_join(d, name + '.ipynb')
220 path = url_path_join(d, name + '.ipynb')
221 nb = self.api.read(path).json()
221 nb = self.api.read(path).json()
222 self.assertEqual(nb['name'], u'%s.ipynb' % name)
222 self.assertEqual(nb['name'], u'%s.ipynb' % name)
223 self.assertEqual(nb['path'], path)
223 self.assertEqual(nb['path'], path)
224 self.assertEqual(nb['type'], 'notebook')
224 self.assertEqual(nb['type'], 'notebook')
225 self.assertIn('content', nb)
225 self.assertIn('content', nb)
226 self.assertEqual(nb['format'], 'json')
226 self.assertEqual(nb['format'], 'json')
227 self.assertIn('content', nb)
227 self.assertIn('content', nb)
228 self.assertIn('metadata', nb['content'])
228 self.assertIn('metadata', nb['content'])
229 self.assertIsInstance(nb['content']['metadata'], dict)
229 self.assertIsInstance(nb['content']['metadata'], dict)
230
230
231 def test_get_contents_no_such_file(self):
231 def test_get_contents_no_such_file(self):
232 # Name that doesn't exist - should be a 404
232 # Name that doesn't exist - should be a 404
233 with assert_http_error(404):
233 with assert_http_error(404):
234 self.api.read('foo/q.ipynb')
234 self.api.read('foo/q.ipynb')
235
235
236 def test_get_text_file_contents(self):
236 def test_get_text_file_contents(self):
237 for d, name in self.dirs_nbs:
237 for d, name in self.dirs_nbs:
238 path = url_path_join(d, name + '.txt')
238 path = url_path_join(d, name + '.txt')
239 model = self.api.read(path).json()
239 model = self.api.read(path).json()
240 self.assertEqual(model['name'], u'%s.txt' % name)
240 self.assertEqual(model['name'], u'%s.txt' % name)
241 self.assertEqual(model['path'], path)
241 self.assertEqual(model['path'], path)
242 self.assertIn('content', model)
242 self.assertIn('content', model)
243 self.assertEqual(model['format'], 'text')
243 self.assertEqual(model['format'], 'text')
244 self.assertEqual(model['type'], 'file')
244 self.assertEqual(model['type'], 'file')
245 self.assertEqual(model['content'], self._txt_for_name(name))
245 self.assertEqual(model['content'], self._txt_for_name(name))
246
246
247 # Name that doesn't exist - should be a 404
247 # Name that doesn't exist - should be a 404
248 with assert_http_error(404):
248 with assert_http_error(404):
249 self.api.read('foo/q.txt')
249 self.api.read('foo/q.txt')
250
250
251 # Specifying format=text should fail on a non-UTF-8 file
251 # Specifying format=text should fail on a non-UTF-8 file
252 with assert_http_error(400):
252 with assert_http_error(400):
253 self.api.read('foo/bar/baz.blob', type_='file', format='text')
253 self.api.read('foo/bar/baz.blob', type_='file', format='text')
254
254
255 def test_get_binary_file_contents(self):
255 def test_get_binary_file_contents(self):
256 for d, name in self.dirs_nbs:
256 for d, name in self.dirs_nbs:
257 path = url_path_join(d, name + '.blob')
257 path = url_path_join(d, name + '.blob')
258 model = self.api.read(path).json()
258 model = self.api.read(path).json()
259 self.assertEqual(model['name'], u'%s.blob' % name)
259 self.assertEqual(model['name'], u'%s.blob' % name)
260 self.assertEqual(model['path'], path)
260 self.assertEqual(model['path'], path)
261 self.assertIn('content', model)
261 self.assertIn('content', model)
262 self.assertEqual(model['format'], 'base64')
262 self.assertEqual(model['format'], 'base64')
263 self.assertEqual(model['type'], 'file')
263 self.assertEqual(model['type'], 'file')
264 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
264 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
265 self.assertEqual(model['content'], b64_data)
265 self.assertEqual(model['content'], b64_data)
266
266
267 # Name that doesn't exist - should be a 404
267 # Name that doesn't exist - should be a 404
268 with assert_http_error(404):
268 with assert_http_error(404):
269 self.api.read('foo/q.txt')
269 self.api.read('foo/q.txt')
270
270
271 def test_get_bad_type(self):
271 def test_get_bad_type(self):
272 with assert_http_error(400):
272 with assert_http_error(400):
273 self.api.read(u'unicodΓ©', type_='file') # this is a directory
273 self.api.read(u'unicodΓ©', type_='file') # this is a directory
274
274
275 with assert_http_error(400):
275 with assert_http_error(400):
276 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
276 self.api.read(u'unicodΓ©/innonascii.ipynb', type_='directory')
277
277
278 def _check_created(self, resp, path, type='notebook'):
278 def _check_created(self, resp, path, type='notebook'):
279 self.assertEqual(resp.status_code, 201)
279 self.assertEqual(resp.status_code, 201)
280 location_header = py3compat.str_to_unicode(resp.headers['Location'])
280 location_header = py3compat.str_to_unicode(resp.headers['Location'])
281 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
281 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
282 rjson = resp.json()
282 rjson = resp.json()
283 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
283 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
284 self.assertEqual(rjson['path'], path)
284 self.assertEqual(rjson['path'], path)
285 self.assertEqual(rjson['type'], type)
285 self.assertEqual(rjson['type'], type)
286 isright = os.path.isdir if type == 'directory' else os.path.isfile
286 isright = os.path.isdir if type == 'directory' else os.path.isfile
287 assert isright(pjoin(
287 assert isright(pjoin(
288 self.notebook_dir.name,
288 self.notebook_dir.name,
289 path.replace('/', os.sep),
289 path.replace('/', os.sep),
290 ))
290 ))
291
291
292 def test_create_untitled(self):
292 def test_create_untitled(self):
293 resp = self.api.create_untitled(path=u'Γ₯ b')
293 resp = self.api.create_untitled(path=u'Γ₯ b')
294 self._check_created(resp, u'Γ₯ b/Untitled0.ipynb')
294 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
295
295
296 # Second time
296 # Second time
297 resp = self.api.create_untitled(path=u'Γ₯ b')
297 resp = self.api.create_untitled(path=u'Γ₯ b')
298 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
298 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
299
299
300 # And two directories down
300 # And two directories down
301 resp = self.api.create_untitled(path='foo/bar')
301 resp = self.api.create_untitled(path='foo/bar')
302 self._check_created(resp, 'foo/bar/Untitled0.ipynb')
302 self._check_created(resp, 'foo/bar/Untitled.ipynb')
303
303
304 def test_create_untitled_txt(self):
304 def test_create_untitled_txt(self):
305 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
305 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
306 self._check_created(resp, 'foo/bar/untitled0.txt', type='file')
306 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
307
307
308 resp = self.api.read(path='foo/bar/untitled0.txt')
308 resp = self.api.read(path='foo/bar/untitled.txt')
309 model = resp.json()
309 model = resp.json()
310 self.assertEqual(model['type'], 'file')
310 self.assertEqual(model['type'], 'file')
311 self.assertEqual(model['format'], 'text')
311 self.assertEqual(model['format'], 'text')
312 self.assertEqual(model['content'], '')
312 self.assertEqual(model['content'], '')
313
313
314 def test_upload(self):
314 def test_upload(self):
315 nb = new_notebook()
315 nb = new_notebook()
316 nbmodel = {'content': nb, 'type': 'notebook'}
316 nbmodel = {'content': nb, 'type': 'notebook'}
317 path = u'Γ₯ b/Upload tΓ©st.ipynb'
317 path = u'Γ₯ b/Upload tΓ©st.ipynb'
318 resp = self.api.upload(path, body=json.dumps(nbmodel))
318 resp = self.api.upload(path, body=json.dumps(nbmodel))
319 self._check_created(resp, path)
319 self._check_created(resp, path)
320
320
321 def test_mkdir_untitled(self):
321 def test_mkdir_untitled(self):
322 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
322 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
323 self._check_created(resp, u'Γ₯ b/Untitled Folder0', type='directory')
323 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
324
324
325 # Second time
325 # Second time
326 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
326 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
327 self._check_created(resp, u'Γ₯ b/Untitled Folder1', type='directory')
327 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
328
328
329 # And two directories down
329 # And two directories down
330 resp = self.api.mkdir_untitled(path='foo/bar')
330 resp = self.api.mkdir_untitled(path='foo/bar')
331 self._check_created(resp, 'foo/bar/Untitled Folder0', type='directory')
331 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
332
332
333 def test_mkdir(self):
333 def test_mkdir(self):
334 path = u'Γ₯ b/New βˆ‚ir'
334 path = u'Γ₯ b/New βˆ‚ir'
335 resp = self.api.mkdir(path)
335 resp = self.api.mkdir(path)
336 self._check_created(resp, path, type='directory')
336 self._check_created(resp, path, type='directory')
337
337
338 def test_mkdir_hidden_400(self):
338 def test_mkdir_hidden_400(self):
339 with assert_http_error(400):
339 with assert_http_error(400):
340 resp = self.api.mkdir(u'Γ₯ b/.hidden')
340 resp = self.api.mkdir(u'Γ₯ b/.hidden')
341
341
342 def test_upload_txt(self):
342 def test_upload_txt(self):
343 body = u'ΓΌnicode tΓ©xt'
343 body = u'ΓΌnicode tΓ©xt'
344 model = {
344 model = {
345 'content' : body,
345 'content' : body,
346 'format' : 'text',
346 'format' : 'text',
347 'type' : 'file',
347 'type' : 'file',
348 }
348 }
349 path = u'Γ₯ b/Upload tΓ©st.txt'
349 path = u'Γ₯ b/Upload tΓ©st.txt'
350 resp = self.api.upload(path, body=json.dumps(model))
350 resp = self.api.upload(path, body=json.dumps(model))
351
351
352 # check roundtrip
352 # check roundtrip
353 resp = self.api.read(path)
353 resp = self.api.read(path)
354 model = resp.json()
354 model = resp.json()
355 self.assertEqual(model['type'], 'file')
355 self.assertEqual(model['type'], 'file')
356 self.assertEqual(model['format'], 'text')
356 self.assertEqual(model['format'], 'text')
357 self.assertEqual(model['content'], body)
357 self.assertEqual(model['content'], body)
358
358
359 def test_upload_b64(self):
359 def test_upload_b64(self):
360 body = b'\xFFblob'
360 body = b'\xFFblob'
361 b64body = base64.encodestring(body).decode('ascii')
361 b64body = base64.encodestring(body).decode('ascii')
362 model = {
362 model = {
363 'content' : b64body,
363 'content' : b64body,
364 'format' : 'base64',
364 'format' : 'base64',
365 'type' : 'file',
365 'type' : 'file',
366 }
366 }
367 path = u'Γ₯ b/Upload tΓ©st.blob'
367 path = u'Γ₯ b/Upload tΓ©st.blob'
368 resp = self.api.upload(path, body=json.dumps(model))
368 resp = self.api.upload(path, body=json.dumps(model))
369
369
370 # check roundtrip
370 # check roundtrip
371 resp = self.api.read(path)
371 resp = self.api.read(path)
372 model = resp.json()
372 model = resp.json()
373 self.assertEqual(model['type'], 'file')
373 self.assertEqual(model['type'], 'file')
374 self.assertEqual(model['path'], path)
374 self.assertEqual(model['path'], path)
375 self.assertEqual(model['format'], 'base64')
375 self.assertEqual(model['format'], 'base64')
376 decoded = base64.decodestring(model['content'].encode('ascii'))
376 decoded = base64.decodestring(model['content'].encode('ascii'))
377 self.assertEqual(decoded, body)
377 self.assertEqual(decoded, body)
378
378
379 def test_upload_v2(self):
379 def test_upload_v2(self):
380 nb = v2.new_notebook()
380 nb = v2.new_notebook()
381 ws = v2.new_worksheet()
381 ws = v2.new_worksheet()
382 nb.worksheets.append(ws)
382 nb.worksheets.append(ws)
383 ws.cells.append(v2.new_code_cell(input='print("hi")'))
383 ws.cells.append(v2.new_code_cell(input='print("hi")'))
384 nbmodel = {'content': nb, 'type': 'notebook'}
384 nbmodel = {'content': nb, 'type': 'notebook'}
385 path = u'Γ₯ b/Upload tΓ©st.ipynb'
385 path = u'Γ₯ b/Upload tΓ©st.ipynb'
386 resp = self.api.upload(path, body=json.dumps(nbmodel))
386 resp = self.api.upload(path, body=json.dumps(nbmodel))
387 self._check_created(resp, path)
387 self._check_created(resp, path)
388 resp = self.api.read(path)
388 resp = self.api.read(path)
389 data = resp.json()
389 data = resp.json()
390 self.assertEqual(data['content']['nbformat'], 4)
390 self.assertEqual(data['content']['nbformat'], 4)
391
391
392 def test_copy(self):
392 def test_copy(self):
393 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'unicodΓ©')
393 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
394 self._check_created(resp, u'unicodΓ©/Γ§ d-Copy0.ipynb')
394 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
395
395
396 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
396 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
397 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy0.ipynb')
397 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
398
398
399 def test_copy_copy(self):
400 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
401 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
402
403 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
404 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
405
399 def test_copy_path(self):
406 def test_copy_path(self):
400 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
407 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
401 self._check_created(resp, u'Γ₯ b/a-Copy0.ipynb')
408 self._check_created(resp, u'Γ₯ b/a.ipynb')
409
410 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
411 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
402
412
403 def test_copy_put_400(self):
413 def test_copy_put_400(self):
404 with assert_http_error(400):
414 with assert_http_error(400):
405 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
415 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
406
416
407 def test_copy_dir_400(self):
417 def test_copy_dir_400(self):
408 # can't copy directories
418 # can't copy directories
409 with assert_http_error(400):
419 with assert_http_error(400):
410 resp = self.api.copy(u'Γ₯ b', u'foo')
420 resp = self.api.copy(u'Γ₯ b', u'foo')
411
421
412 def test_delete(self):
422 def test_delete(self):
413 for d, name in self.dirs_nbs:
423 for d, name in self.dirs_nbs:
414 print('%r, %r' % (d, name))
424 print('%r, %r' % (d, name))
415 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
425 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
416 self.assertEqual(resp.status_code, 204)
426 self.assertEqual(resp.status_code, 204)
417
427
418 for d in self.dirs + ['/']:
428 for d in self.dirs + ['/']:
419 nbs = notebooks_only(self.api.list(d).json())
429 nbs = notebooks_only(self.api.list(d).json())
420 print('------')
430 print('------')
421 print(d)
431 print(d)
422 print(nbs)
432 print(nbs)
423 self.assertEqual(nbs, [])
433 self.assertEqual(nbs, [])
424
434
425 def test_delete_dirs(self):
435 def test_delete_dirs(self):
426 # depth-first delete everything, so we don't try to delete empty directories
436 # depth-first delete everything, so we don't try to delete empty directories
427 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
437 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
428 listing = self.api.list(name).json()['content']
438 listing = self.api.list(name).json()['content']
429 for model in listing:
439 for model in listing:
430 self.api.delete(model['path'])
440 self.api.delete(model['path'])
431 listing = self.api.list('/').json()['content']
441 listing = self.api.list('/').json()['content']
432 self.assertEqual(listing, [])
442 self.assertEqual(listing, [])
433
443
434 def test_delete_non_empty_dir(self):
444 def test_delete_non_empty_dir(self):
435 """delete non-empty dir raises 400"""
445 """delete non-empty dir raises 400"""
436 with assert_http_error(400):
446 with assert_http_error(400):
437 self.api.delete(u'Γ₯ b')
447 self.api.delete(u'Γ₯ b')
438
448
439 def test_rename(self):
449 def test_rename(self):
440 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
450 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
441 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
451 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
442 self.assertEqual(resp.json()['name'], 'z.ipynb')
452 self.assertEqual(resp.json()['name'], 'z.ipynb')
443 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
453 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
444 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
454 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
445
455
446 nbs = notebooks_only(self.api.list('foo').json())
456 nbs = notebooks_only(self.api.list('foo').json())
447 nbnames = set(n['name'] for n in nbs)
457 nbnames = set(n['name'] for n in nbs)
448 self.assertIn('z.ipynb', nbnames)
458 self.assertIn('z.ipynb', nbnames)
449 self.assertNotIn('a.ipynb', nbnames)
459 self.assertNotIn('a.ipynb', nbnames)
450
460
451 def test_rename_existing(self):
461 def test_rename_existing(self):
452 with assert_http_error(409):
462 with assert_http_error(409):
453 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
463 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
454
464
455 def test_save(self):
465 def test_save(self):
456 resp = self.api.read('foo/a.ipynb')
466 resp = self.api.read('foo/a.ipynb')
457 nbcontent = json.loads(resp.text)['content']
467 nbcontent = json.loads(resp.text)['content']
458 nb = from_dict(nbcontent)
468 nb = from_dict(nbcontent)
459 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
469 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
460
470
461 nbmodel= {'content': nb, 'type': 'notebook'}
471 nbmodel= {'content': nb, 'type': 'notebook'}
462 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
472 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
463
473
464 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
474 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
465 with io.open(nbfile, 'r', encoding='utf-8') as f:
475 with io.open(nbfile, 'r', encoding='utf-8') as f:
466 newnb = read(f, as_version=4)
476 newnb = read(f, as_version=4)
467 self.assertEqual(newnb.cells[0].source,
477 self.assertEqual(newnb.cells[0].source,
468 u'Created by test Β³')
478 u'Created by test Β³')
469 nbcontent = self.api.read('foo/a.ipynb').json()['content']
479 nbcontent = self.api.read('foo/a.ipynb').json()['content']
470 newnb = from_dict(nbcontent)
480 newnb = from_dict(nbcontent)
471 self.assertEqual(newnb.cells[0].source,
481 self.assertEqual(newnb.cells[0].source,
472 u'Created by test Β³')
482 u'Created by test Β³')
473
483
474
484
475 def test_checkpoints(self):
485 def test_checkpoints(self):
476 resp = self.api.read('foo/a.ipynb')
486 resp = self.api.read('foo/a.ipynb')
477 r = self.api.new_checkpoint('foo/a.ipynb')
487 r = self.api.new_checkpoint('foo/a.ipynb')
478 self.assertEqual(r.status_code, 201)
488 self.assertEqual(r.status_code, 201)
479 cp1 = r.json()
489 cp1 = r.json()
480 self.assertEqual(set(cp1), {'id', 'last_modified'})
490 self.assertEqual(set(cp1), {'id', 'last_modified'})
481 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
491 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
482
492
483 # Modify it
493 # Modify it
484 nbcontent = json.loads(resp.text)['content']
494 nbcontent = json.loads(resp.text)['content']
485 nb = from_dict(nbcontent)
495 nb = from_dict(nbcontent)
486 hcell = new_markdown_cell('Created by test')
496 hcell = new_markdown_cell('Created by test')
487 nb.cells.append(hcell)
497 nb.cells.append(hcell)
488 # Save
498 # Save
489 nbmodel= {'content': nb, 'type': 'notebook'}
499 nbmodel= {'content': nb, 'type': 'notebook'}
490 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
500 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
491
501
492 # List checkpoints
502 # List checkpoints
493 cps = self.api.get_checkpoints('foo/a.ipynb').json()
503 cps = self.api.get_checkpoints('foo/a.ipynb').json()
494 self.assertEqual(cps, [cp1])
504 self.assertEqual(cps, [cp1])
495
505
496 nbcontent = self.api.read('foo/a.ipynb').json()['content']
506 nbcontent = self.api.read('foo/a.ipynb').json()['content']
497 nb = from_dict(nbcontent)
507 nb = from_dict(nbcontent)
498 self.assertEqual(nb.cells[0].source, 'Created by test')
508 self.assertEqual(nb.cells[0].source, 'Created by test')
499
509
500 # Restore cp1
510 # Restore cp1
501 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
511 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
502 self.assertEqual(r.status_code, 204)
512 self.assertEqual(r.status_code, 204)
503 nbcontent = self.api.read('foo/a.ipynb').json()['content']
513 nbcontent = self.api.read('foo/a.ipynb').json()['content']
504 nb = from_dict(nbcontent)
514 nb = from_dict(nbcontent)
505 self.assertEqual(nb.cells, [])
515 self.assertEqual(nb.cells, [])
506
516
507 # Delete cp1
517 # Delete cp1
508 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
518 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
509 self.assertEqual(r.status_code, 204)
519 self.assertEqual(r.status_code, 204)
510 cps = self.api.get_checkpoints('foo/a.ipynb').json()
520 cps = self.api.get_checkpoints('foo/a.ipynb').json()
511 self.assertEqual(cps, [])
521 self.assertEqual(cps, [])
@@ -1,365 +1,369 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import logging
5 import logging
6 import os
6 import os
7
7
8 from tornado.web import HTTPError
8 from tornado.web import HTTPError
9 from unittest import TestCase
9 from unittest import TestCase
10 from tempfile import NamedTemporaryFile
10 from tempfile import NamedTemporaryFile
11
11
12 from IPython.nbformat import v4 as nbformat
12 from IPython.nbformat import v4 as nbformat
13
13
14 from IPython.utils.tempdir import TemporaryDirectory
14 from IPython.utils.tempdir import TemporaryDirectory
15 from IPython.utils.traitlets import TraitError
15 from IPython.utils.traitlets import TraitError
16 from IPython.html.utils import url_path_join
16 from IPython.html.utils import url_path_join
17 from IPython.testing import decorators as dec
17 from IPython.testing import decorators as dec
18
18
19 from ..filemanager import FileContentsManager
19 from ..filemanager import FileContentsManager
20 from ..manager import ContentsManager
20 from ..manager import ContentsManager
21
21
22
22
23 class TestFileContentsManager(TestCase):
23 class TestFileContentsManager(TestCase):
24
24
25 def test_root_dir(self):
25 def test_root_dir(self):
26 with TemporaryDirectory() as td:
26 with TemporaryDirectory() as td:
27 fm = FileContentsManager(root_dir=td)
27 fm = FileContentsManager(root_dir=td)
28 self.assertEqual(fm.root_dir, td)
28 self.assertEqual(fm.root_dir, td)
29
29
30 def test_missing_root_dir(self):
30 def test_missing_root_dir(self):
31 with TemporaryDirectory() as td:
31 with TemporaryDirectory() as td:
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34
34
35 def test_invalid_root_dir(self):
35 def test_invalid_root_dir(self):
36 with NamedTemporaryFile() as tf:
36 with NamedTemporaryFile() as tf:
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38
38
39 def test_get_os_path(self):
39 def test_get_os_path(self):
40 # full filesystem path should be returned with correct operating system
40 # full filesystem path should be returned with correct operating system
41 # separators.
41 # separators.
42 with TemporaryDirectory() as td:
42 with TemporaryDirectory() as td:
43 root = td
43 root = td
44 fm = FileContentsManager(root_dir=root)
44 fm = FileContentsManager(root_dir=root)
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
45 path = fm._get_os_path('/path/to/notebook/test.ipynb')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 self.assertEqual(path, fs_path)
48 self.assertEqual(path, fs_path)
49
49
50 fm = FileContentsManager(root_dir=root)
50 fm = FileContentsManager(root_dir=root)
51 path = fm._get_os_path('test.ipynb')
51 path = fm._get_os_path('test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 self.assertEqual(path, fs_path)
53 self.assertEqual(path, fs_path)
54
54
55 fm = FileContentsManager(root_dir=root)
55 fm = FileContentsManager(root_dir=root)
56 path = fm._get_os_path('////test.ipynb')
56 path = fm._get_os_path('////test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 self.assertEqual(path, fs_path)
58 self.assertEqual(path, fs_path)
59
59
60 def test_checkpoint_subdir(self):
60 def test_checkpoint_subdir(self):
61 subd = u'sub βˆ‚ir'
61 subd = u'sub βˆ‚ir'
62 cp_name = 'test-cp.ipynb'
62 cp_name = 'test-cp.ipynb'
63 with TemporaryDirectory() as td:
63 with TemporaryDirectory() as td:
64 root = td
64 root = td
65 os.mkdir(os.path.join(td, subd))
65 os.mkdir(os.path.join(td, subd))
66 fm = FileContentsManager(root_dir=root)
66 fm = FileContentsManager(root_dir=root)
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
68 cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
69 self.assertNotEqual(cp_dir, cp_subdir)
69 self.assertNotEqual(cp_dir, cp_subdir)
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72
72
73
73
74 class TestContentsManager(TestCase):
74 class TestContentsManager(TestCase):
75
75
76 def setUp(self):
76 def setUp(self):
77 self._temp_dir = TemporaryDirectory()
77 self._temp_dir = TemporaryDirectory()
78 self.td = self._temp_dir.name
78 self.td = self._temp_dir.name
79 self.contents_manager = FileContentsManager(
79 self.contents_manager = FileContentsManager(
80 root_dir=self.td,
80 root_dir=self.td,
81 log=logging.getLogger()
81 log=logging.getLogger()
82 )
82 )
83
83
84 def tearDown(self):
84 def tearDown(self):
85 self._temp_dir.cleanup()
85 self._temp_dir.cleanup()
86
86
87 def make_dir(self, abs_path, rel_path):
87 def make_dir(self, abs_path, rel_path):
88 """make subdirectory, rel_path is the relative path
88 """make subdirectory, rel_path is the relative path
89 to that directory from the location where the server started"""
89 to that directory from the location where the server started"""
90 os_path = os.path.join(abs_path, rel_path)
90 os_path = os.path.join(abs_path, rel_path)
91 try:
91 try:
92 os.makedirs(os_path)
92 os.makedirs(os_path)
93 except OSError:
93 except OSError:
94 print("Directory already exists: %r" % os_path)
94 print("Directory already exists: %r" % os_path)
95 return os_path
95 return os_path
96
96
97 def add_code_cell(self, nb):
97 def add_code_cell(self, nb):
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 nb.cells.append(cell)
100 nb.cells.append(cell)
101
101
102 def new_notebook(self):
102 def new_notebook(self):
103 cm = self.contents_manager
103 cm = self.contents_manager
104 model = cm.new_untitled(type='notebook')
104 model = cm.new_untitled(type='notebook')
105 name = model['name']
105 name = model['name']
106 path = model['path']
106 path = model['path']
107
107
108 full_model = cm.get(path)
108 full_model = cm.get(path)
109 nb = full_model['content']
109 nb = full_model['content']
110 self.add_code_cell(nb)
110 self.add_code_cell(nb)
111
111
112 cm.save(full_model, path)
112 cm.save(full_model, path)
113 return nb, name, path
113 return nb, name, path
114
114
115 def test_new_untitled(self):
115 def test_new_untitled(self):
116 cm = self.contents_manager
116 cm = self.contents_manager
117 # Test in root directory
117 # Test in root directory
118 model = cm.new_untitled(type='notebook')
118 model = cm.new_untitled(type='notebook')
119 assert isinstance(model, dict)
119 assert isinstance(model, dict)
120 self.assertIn('name', model)
120 self.assertIn('name', model)
121 self.assertIn('path', model)
121 self.assertIn('path', model)
122 self.assertIn('type', model)
122 self.assertIn('type', model)
123 self.assertEqual(model['type'], 'notebook')
123 self.assertEqual(model['type'], 'notebook')
124 self.assertEqual(model['name'], 'Untitled0.ipynb')
124 self.assertEqual(model['name'], 'Untitled.ipynb')
125 self.assertEqual(model['path'], 'Untitled0.ipynb')
125 self.assertEqual(model['path'], 'Untitled.ipynb')
126
126
127 # Test in sub-directory
127 # Test in sub-directory
128 model = cm.new_untitled(type='directory')
128 model = cm.new_untitled(type='directory')
129 assert isinstance(model, dict)
129 assert isinstance(model, dict)
130 self.assertIn('name', model)
130 self.assertIn('name', model)
131 self.assertIn('path', model)
131 self.assertIn('path', model)
132 self.assertIn('type', model)
132 self.assertIn('type', model)
133 self.assertEqual(model['type'], 'directory')
133 self.assertEqual(model['type'], 'directory')
134 self.assertEqual(model['name'], 'Untitled Folder0')
134 self.assertEqual(model['name'], 'Untitled Folder')
135 self.assertEqual(model['path'], 'Untitled Folder0')
135 self.assertEqual(model['path'], 'Untitled Folder')
136 sub_dir = model['path']
136 sub_dir = model['path']
137
137
138 model = cm.new_untitled(path=sub_dir)
138 model = cm.new_untitled(path=sub_dir)
139 assert isinstance(model, dict)
139 assert isinstance(model, dict)
140 self.assertIn('name', model)
140 self.assertIn('name', model)
141 self.assertIn('path', model)
141 self.assertIn('path', model)
142 self.assertIn('type', model)
142 self.assertIn('type', model)
143 self.assertEqual(model['type'], 'file')
143 self.assertEqual(model['type'], 'file')
144 self.assertEqual(model['name'], 'untitled0')
144 self.assertEqual(model['name'], 'untitled')
145 self.assertEqual(model['path'], '%s/untitled0' % sub_dir)
145 self.assertEqual(model['path'], '%s/untitled' % sub_dir)
146
146
147 def test_get(self):
147 def test_get(self):
148 cm = self.contents_manager
148 cm = self.contents_manager
149 # Create a notebook
149 # Create a notebook
150 model = cm.new_untitled(type='notebook')
150 model = cm.new_untitled(type='notebook')
151 name = model['name']
151 name = model['name']
152 path = model['path']
152 path = model['path']
153
153
154 # Check that we 'get' on the notebook we just created
154 # Check that we 'get' on the notebook we just created
155 model2 = cm.get(path)
155 model2 = cm.get(path)
156 assert isinstance(model2, dict)
156 assert isinstance(model2, dict)
157 self.assertIn('name', model2)
157 self.assertIn('name', model2)
158 self.assertIn('path', model2)
158 self.assertIn('path', model2)
159 self.assertEqual(model['name'], name)
159 self.assertEqual(model['name'], name)
160 self.assertEqual(model['path'], path)
160 self.assertEqual(model['path'], path)
161
161
162 nb_as_file = cm.get(path, content=True, type_='file')
162 nb_as_file = cm.get(path, content=True, type_='file')
163 self.assertEqual(nb_as_file['path'], path)
163 self.assertEqual(nb_as_file['path'], path)
164 self.assertEqual(nb_as_file['type'], 'file')
164 self.assertEqual(nb_as_file['type'], 'file')
165 self.assertEqual(nb_as_file['format'], 'text')
165 self.assertEqual(nb_as_file['format'], 'text')
166 self.assertNotIsInstance(nb_as_file['content'], dict)
166 self.assertNotIsInstance(nb_as_file['content'], dict)
167
167
168 nb_as_bin_file = cm.get(path, content=True, type_='file', format='base64')
168 nb_as_bin_file = cm.get(path, content=True, type_='file', format='base64')
169 self.assertEqual(nb_as_bin_file['format'], 'base64')
169 self.assertEqual(nb_as_bin_file['format'], 'base64')
170
170
171 # Test in sub-directory
171 # Test in sub-directory
172 sub_dir = '/foo/'
172 sub_dir = '/foo/'
173 self.make_dir(cm.root_dir, 'foo')
173 self.make_dir(cm.root_dir, 'foo')
174 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
174 model = cm.new_untitled(path=sub_dir, ext='.ipynb')
175 model2 = cm.get(sub_dir + name)
175 model2 = cm.get(sub_dir + name)
176 assert isinstance(model2, dict)
176 assert isinstance(model2, dict)
177 self.assertIn('name', model2)
177 self.assertIn('name', model2)
178 self.assertIn('path', model2)
178 self.assertIn('path', model2)
179 self.assertIn('content', model2)
179 self.assertIn('content', model2)
180 self.assertEqual(model2['name'], 'Untitled0.ipynb')
180 self.assertEqual(model2['name'], 'Untitled.ipynb')
181 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
181 self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
182
182
183 # Test getting directory model
183 # Test getting directory model
184 dirmodel = cm.get('foo')
184 dirmodel = cm.get('foo')
185 self.assertEqual(dirmodel['type'], 'directory')
185 self.assertEqual(dirmodel['type'], 'directory')
186
186
187 with self.assertRaises(HTTPError):
187 with self.assertRaises(HTTPError):
188 cm.get('foo', type_='file')
188 cm.get('foo', type_='file')
189
189
190
190
191 @dec.skip_win32
191 @dec.skip_win32
192 def test_bad_symlink(self):
192 def test_bad_symlink(self):
193 cm = self.contents_manager
193 cm = self.contents_manager
194 path = 'test bad symlink'
194 path = 'test bad symlink'
195 os_path = self.make_dir(cm.root_dir, path)
195 os_path = self.make_dir(cm.root_dir, path)
196
196
197 file_model = cm.new_untitled(path=path, ext='.txt')
197 file_model = cm.new_untitled(path=path, ext='.txt')
198
198
199 # create a broken symlink
199 # create a broken symlink
200 os.symlink("target", os.path.join(os_path, "bad symlink"))
200 os.symlink("target", os.path.join(os_path, "bad symlink"))
201 model = cm.get(path)
201 model = cm.get(path)
202 self.assertEqual(model['content'], [file_model])
202 self.assertEqual(model['content'], [file_model])
203
203
204 @dec.skip_win32
204 @dec.skip_win32
205 def test_good_symlink(self):
205 def test_good_symlink(self):
206 cm = self.contents_manager
206 cm = self.contents_manager
207 parent = 'test good symlink'
207 parent = 'test good symlink'
208 name = 'good symlink'
208 name = 'good symlink'
209 path = '{0}/{1}'.format(parent, name)
209 path = '{0}/{1}'.format(parent, name)
210 os_path = self.make_dir(cm.root_dir, parent)
210 os_path = self.make_dir(cm.root_dir, parent)
211
211
212 file_model = cm.new(path=parent + '/zfoo.txt')
212 file_model = cm.new(path=parent + '/zfoo.txt')
213
213
214 # create a good symlink
214 # create a good symlink
215 os.symlink(file_model['name'], os.path.join(os_path, name))
215 os.symlink(file_model['name'], os.path.join(os_path, name))
216 symlink_model = cm.get(path, content=False)
216 symlink_model = cm.get(path, content=False)
217 dir_model = cm.get(parent)
217 dir_model = cm.get(parent)
218 self.assertEqual(
218 self.assertEqual(
219 sorted(dir_model['content'], key=lambda x: x['name']),
219 sorted(dir_model['content'], key=lambda x: x['name']),
220 [symlink_model, file_model],
220 [symlink_model, file_model],
221 )
221 )
222
222
223 def test_update(self):
223 def test_update(self):
224 cm = self.contents_manager
224 cm = self.contents_manager
225 # Create a notebook
225 # Create a notebook
226 model = cm.new_untitled(type='notebook')
226 model = cm.new_untitled(type='notebook')
227 name = model['name']
227 name = model['name']
228 path = model['path']
228 path = model['path']
229
229
230 # Change the name in the model for rename
230 # Change the name in the model for rename
231 model['path'] = 'test.ipynb'
231 model['path'] = 'test.ipynb'
232 model = cm.update(model, path)
232 model = cm.update(model, path)
233 assert isinstance(model, dict)
233 assert isinstance(model, dict)
234 self.assertIn('name', model)
234 self.assertIn('name', model)
235 self.assertIn('path', model)
235 self.assertIn('path', model)
236 self.assertEqual(model['name'], 'test.ipynb')
236 self.assertEqual(model['name'], 'test.ipynb')
237
237
238 # Make sure the old name is gone
238 # Make sure the old name is gone
239 self.assertRaises(HTTPError, cm.get, path)
239 self.assertRaises(HTTPError, cm.get, path)
240
240
241 # Test in sub-directory
241 # Test in sub-directory
242 # Create a directory and notebook in that directory
242 # Create a directory and notebook in that directory
243 sub_dir = '/foo/'
243 sub_dir = '/foo/'
244 self.make_dir(cm.root_dir, 'foo')
244 self.make_dir(cm.root_dir, 'foo')
245 model = cm.new_untitled(path=sub_dir, type='notebook')
245 model = cm.new_untitled(path=sub_dir, type='notebook')
246 name = model['name']
246 name = model['name']
247 path = model['path']
247 path = model['path']
248
248
249 # Change the name in the model for rename
249 # Change the name in the model for rename
250 d = path.rsplit('/', 1)[0]
250 d = path.rsplit('/', 1)[0]
251 new_path = model['path'] = d + '/test_in_sub.ipynb'
251 new_path = model['path'] = d + '/test_in_sub.ipynb'
252 model = cm.update(model, path)
252 model = cm.update(model, path)
253 assert isinstance(model, dict)
253 assert isinstance(model, dict)
254 self.assertIn('name', model)
254 self.assertIn('name', model)
255 self.assertIn('path', model)
255 self.assertIn('path', model)
256 self.assertEqual(model['name'], 'test_in_sub.ipynb')
256 self.assertEqual(model['name'], 'test_in_sub.ipynb')
257 self.assertEqual(model['path'], new_path)
257 self.assertEqual(model['path'], new_path)
258
258
259 # Make sure the old name is gone
259 # Make sure the old name is gone
260 self.assertRaises(HTTPError, cm.get, path)
260 self.assertRaises(HTTPError, cm.get, path)
261
261
262 def test_save(self):
262 def test_save(self):
263 cm = self.contents_manager
263 cm = self.contents_manager
264 # Create a notebook
264 # Create a notebook
265 model = cm.new_untitled(type='notebook')
265 model = cm.new_untitled(type='notebook')
266 name = model['name']
266 name = model['name']
267 path = model['path']
267 path = model['path']
268
268
269 # Get the model with 'content'
269 # Get the model with 'content'
270 full_model = cm.get(path)
270 full_model = cm.get(path)
271
271
272 # Save the notebook
272 # Save the notebook
273 model = cm.save(full_model, path)
273 model = cm.save(full_model, path)
274 assert isinstance(model, dict)
274 assert isinstance(model, dict)
275 self.assertIn('name', model)
275 self.assertIn('name', model)
276 self.assertIn('path', model)
276 self.assertIn('path', model)
277 self.assertEqual(model['name'], name)
277 self.assertEqual(model['name'], name)
278 self.assertEqual(model['path'], path)
278 self.assertEqual(model['path'], path)
279
279
280 # Test in sub-directory
280 # Test in sub-directory
281 # Create a directory and notebook in that directory
281 # Create a directory and notebook in that directory
282 sub_dir = '/foo/'
282 sub_dir = '/foo/'
283 self.make_dir(cm.root_dir, 'foo')
283 self.make_dir(cm.root_dir, 'foo')
284 model = cm.new_untitled(path=sub_dir, type='notebook')
284 model = cm.new_untitled(path=sub_dir, type='notebook')
285 name = model['name']
285 name = model['name']
286 path = model['path']
286 path = model['path']
287 model = cm.get(path)
287 model = cm.get(path)
288
288
289 # Change the name in the model for rename
289 # Change the name in the model for rename
290 model = cm.save(model, path)
290 model = cm.save(model, path)
291 assert isinstance(model, dict)
291 assert isinstance(model, dict)
292 self.assertIn('name', model)
292 self.assertIn('name', model)
293 self.assertIn('path', model)
293 self.assertIn('path', model)
294 self.assertEqual(model['name'], 'Untitled0.ipynb')
294 self.assertEqual(model['name'], 'Untitled.ipynb')
295 self.assertEqual(model['path'], 'foo/Untitled0.ipynb')
295 self.assertEqual(model['path'], 'foo/Untitled.ipynb')
296
296
297 def test_delete(self):
297 def test_delete(self):
298 cm = self.contents_manager
298 cm = self.contents_manager
299 # Create a notebook
299 # Create a notebook
300 nb, name, path = self.new_notebook()
300 nb, name, path = self.new_notebook()
301
301
302 # Delete the notebook
302 # Delete the notebook
303 cm.delete(path)
303 cm.delete(path)
304
304
305 # Check that a 'get' on the deleted notebook raises and error
305 # Check that a 'get' on the deleted notebook raises and error
306 self.assertRaises(HTTPError, cm.get, path)
306 self.assertRaises(HTTPError, cm.get, path)
307
307
308 def test_copy(self):
308 def test_copy(self):
309 cm = self.contents_manager
309 cm = self.contents_manager
310 parent = u'Γ₯ b'
310 parent = u'Γ₯ b'
311 name = u'nb √.ipynb'
311 name = u'nb √.ipynb'
312 path = u'{0}/{1}'.format(parent, name)
312 path = u'{0}/{1}'.format(parent, name)
313 os.mkdir(os.path.join(cm.root_dir, parent))
313 os.mkdir(os.path.join(cm.root_dir, parent))
314 orig = cm.new(path=path)
314 orig = cm.new(path=path)
315
315
316 # copy with unspecified name
316 # copy with unspecified name
317 copy = cm.copy(path)
317 copy = cm.copy(path)
318 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
318 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
319
319
320 # copy with specified name
320 # copy with specified name
321 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
321 copy2 = cm.copy(path, u'Γ₯ b/copy 2.ipynb')
322 self.assertEqual(copy2['name'], u'copy 2.ipynb')
322 self.assertEqual(copy2['name'], u'copy 2.ipynb')
323 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
323 self.assertEqual(copy2['path'], u'Γ₯ b/copy 2.ipynb')
324 # copy with specified path
325 copy2 = cm.copy(path, u'/')
326 self.assertEqual(copy2['name'], name)
327 self.assertEqual(copy2['path'], name)
324
328
325 def test_trust_notebook(self):
329 def test_trust_notebook(self):
326 cm = self.contents_manager
330 cm = self.contents_manager
327 nb, name, path = self.new_notebook()
331 nb, name, path = self.new_notebook()
328
332
329 untrusted = cm.get(path)['content']
333 untrusted = cm.get(path)['content']
330 assert not cm.notary.check_cells(untrusted)
334 assert not cm.notary.check_cells(untrusted)
331
335
332 # print(untrusted)
336 # print(untrusted)
333 cm.trust_notebook(path)
337 cm.trust_notebook(path)
334 trusted = cm.get(path)['content']
338 trusted = cm.get(path)['content']
335 # print(trusted)
339 # print(trusted)
336 assert cm.notary.check_cells(trusted)
340 assert cm.notary.check_cells(trusted)
337
341
338 def test_mark_trusted_cells(self):
342 def test_mark_trusted_cells(self):
339 cm = self.contents_manager
343 cm = self.contents_manager
340 nb, name, path = self.new_notebook()
344 nb, name, path = self.new_notebook()
341
345
342 cm.mark_trusted_cells(nb, path)
346 cm.mark_trusted_cells(nb, path)
343 for cell in nb.cells:
347 for cell in nb.cells:
344 if cell.cell_type == 'code':
348 if cell.cell_type == 'code':
345 assert not cell.metadata.trusted
349 assert not cell.metadata.trusted
346
350
347 cm.trust_notebook(path)
351 cm.trust_notebook(path)
348 nb = cm.get(path)['content']
352 nb = cm.get(path)['content']
349 for cell in nb.cells:
353 for cell in nb.cells:
350 if cell.cell_type == 'code':
354 if cell.cell_type == 'code':
351 assert cell.metadata.trusted
355 assert cell.metadata.trusted
352
356
353 def test_check_and_sign(self):
357 def test_check_and_sign(self):
354 cm = self.contents_manager
358 cm = self.contents_manager
355 nb, name, path = self.new_notebook()
359 nb, name, path = self.new_notebook()
356
360
357 cm.mark_trusted_cells(nb, path)
361 cm.mark_trusted_cells(nb, path)
358 cm.check_and_sign(nb, path)
362 cm.check_and_sign(nb, path)
359 assert not cm.notary.check_signature(nb)
363 assert not cm.notary.check_signature(nb)
360
364
361 cm.trust_notebook(path)
365 cm.trust_notebook(path)
362 nb = cm.get(path)['content']
366 nb = cm.get(path)['content']
363 cm.mark_trusted_cells(nb, path)
367 cm.mark_trusted_cells(nb, path)
364 cm.check_and_sign(nb, path)
368 cm.check_and_sign(nb, path)
365 assert cm.notary.check_signature(nb)
369 assert cm.notary.check_signature(nb)
General Comments 0
You need to be logged in to leave comments. Login now