##// END OF EJS Templates
TEST: Also test deletion.
Scott Sanderson -
Show More
@@ -1,683 +1,687 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 from contextlib import contextmanager
5 from contextlib import contextmanager
6 import io
6 import io
7 import json
7 import json
8 import os
8 import os
9 import shutil
9 import shutil
10 from unicodedata import normalize
10 from unicodedata import normalize
11
11
12 pjoin = os.path.join
12 pjoin = os.path.join
13
13
14 import requests
14 import requests
15
15
16 from ..filecheckpoints import GenericFileCheckpoints
16 from ..filecheckpoints import GenericFileCheckpoints
17
17
18 from IPython.config import Config
18 from IPython.config import Config
19 from IPython.html.utils import url_path_join, url_escape, to_os_path
19 from IPython.html.utils import url_path_join, url_escape, to_os_path
20 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
20 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
21 from IPython.nbformat import read, write, from_dict
21 from IPython.nbformat import read, write, from_dict
22 from IPython.nbformat.v4 import (
22 from IPython.nbformat.v4 import (
23 new_notebook, new_markdown_cell,
23 new_notebook, new_markdown_cell,
24 )
24 )
25 from IPython.nbformat import v2
25 from IPython.nbformat import v2
26 from IPython.utils import py3compat
26 from IPython.utils import py3compat
27 from IPython.utils.data import uniq_stable
27 from IPython.utils.data import uniq_stable
28 from IPython.utils.tempdir import TemporaryDirectory
28 from IPython.utils.tempdir import TemporaryDirectory
29
29
30
30
31 def notebooks_only(dir_model):
31 def notebooks_only(dir_model):
32 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
32 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
33
33
34 def dirs_only(dir_model):
34 def dirs_only(dir_model):
35 return [x for x in dir_model['content'] if x['type']=='directory']
35 return [x for x in dir_model['content'] if x['type']=='directory']
36
36
37
37
38 class API(object):
38 class API(object):
39 """Wrapper for contents API calls."""
39 """Wrapper for contents API calls."""
40 def __init__(self, base_url):
40 def __init__(self, base_url):
41 self.base_url = base_url
41 self.base_url = base_url
42
42
43 def _req(self, verb, path, body=None, params=None):
43 def _req(self, verb, path, body=None, params=None):
44 response = requests.request(verb,
44 response = requests.request(verb,
45 url_path_join(self.base_url, 'api/contents', path),
45 url_path_join(self.base_url, 'api/contents', path),
46 data=body, params=params,
46 data=body, params=params,
47 )
47 )
48 response.raise_for_status()
48 response.raise_for_status()
49 return response
49 return response
50
50
51 def list(self, path='/'):
51 def list(self, path='/'):
52 return self._req('GET', path)
52 return self._req('GET', path)
53
53
54 def read(self, path, type=None, format=None, content=None):
54 def read(self, path, type=None, format=None, content=None):
55 params = {}
55 params = {}
56 if type is not None:
56 if type is not None:
57 params['type'] = type
57 params['type'] = type
58 if format is not None:
58 if format is not None:
59 params['format'] = format
59 params['format'] = format
60 if content == False:
60 if content == False:
61 params['content'] = '0'
61 params['content'] = '0'
62 return self._req('GET', path, params=params)
62 return self._req('GET', path, params=params)
63
63
64 def create_untitled(self, path='/', ext='.ipynb'):
64 def create_untitled(self, path='/', ext='.ipynb'):
65 body = None
65 body = None
66 if ext:
66 if ext:
67 body = json.dumps({'ext': ext})
67 body = json.dumps({'ext': ext})
68 return self._req('POST', path, body)
68 return self._req('POST', path, body)
69
69
70 def mkdir_untitled(self, path='/'):
70 def mkdir_untitled(self, path='/'):
71 return self._req('POST', path, json.dumps({'type': 'directory'}))
71 return self._req('POST', path, json.dumps({'type': 'directory'}))
72
72
73 def copy(self, copy_from, path='/'):
73 def copy(self, copy_from, path='/'):
74 body = json.dumps({'copy_from':copy_from})
74 body = json.dumps({'copy_from':copy_from})
75 return self._req('POST', path, body)
75 return self._req('POST', path, body)
76
76
77 def create(self, path='/'):
77 def create(self, path='/'):
78 return self._req('PUT', path)
78 return self._req('PUT', path)
79
79
80 def upload(self, path, body):
80 def upload(self, path, body):
81 return self._req('PUT', path, body)
81 return self._req('PUT', path, body)
82
82
83 def mkdir(self, path='/'):
83 def mkdir(self, path='/'):
84 return self._req('PUT', path, json.dumps({'type': 'directory'}))
84 return self._req('PUT', path, json.dumps({'type': 'directory'}))
85
85
86 def copy_put(self, copy_from, path='/'):
86 def copy_put(self, copy_from, path='/'):
87 body = json.dumps({'copy_from':copy_from})
87 body = json.dumps({'copy_from':copy_from})
88 return self._req('PUT', path, body)
88 return self._req('PUT', path, body)
89
89
90 def save(self, path, body):
90 def save(self, path, body):
91 return self._req('PUT', path, body)
91 return self._req('PUT', path, body)
92
92
93 def delete(self, path='/'):
93 def delete(self, path='/'):
94 return self._req('DELETE', path)
94 return self._req('DELETE', path)
95
95
96 def rename(self, path, new_path):
96 def rename(self, path, new_path):
97 body = json.dumps({'path': new_path})
97 body = json.dumps({'path': new_path})
98 return self._req('PATCH', path, body)
98 return self._req('PATCH', path, body)
99
99
100 def get_checkpoints(self, path):
100 def get_checkpoints(self, path):
101 return self._req('GET', url_path_join(path, 'checkpoints'))
101 return self._req('GET', url_path_join(path, 'checkpoints'))
102
102
103 def new_checkpoint(self, path):
103 def new_checkpoint(self, path):
104 return self._req('POST', url_path_join(path, 'checkpoints'))
104 return self._req('POST', url_path_join(path, 'checkpoints'))
105
105
106 def restore_checkpoint(self, path, checkpoint_id):
106 def restore_checkpoint(self, path, checkpoint_id):
107 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
107 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
108
108
109 def delete_checkpoint(self, path, checkpoint_id):
109 def delete_checkpoint(self, path, checkpoint_id):
110 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
110 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
111
111
112 class APITest(NotebookTestBase):
112 class APITest(NotebookTestBase):
113 """Test the kernels web service API"""
113 """Test the kernels web service API"""
114 dirs_nbs = [('', 'inroot'),
114 dirs_nbs = [('', 'inroot'),
115 ('Directory with spaces in', 'inspace'),
115 ('Directory with spaces in', 'inspace'),
116 (u'unicodΓ©', 'innonascii'),
116 (u'unicodΓ©', 'innonascii'),
117 ('foo', 'a'),
117 ('foo', 'a'),
118 ('foo', 'b'),
118 ('foo', 'b'),
119 ('foo', 'name with spaces'),
119 ('foo', 'name with spaces'),
120 ('foo', u'unicodΓ©'),
120 ('foo', u'unicodΓ©'),
121 ('foo/bar', 'baz'),
121 ('foo/bar', 'baz'),
122 ('ordering', 'A'),
122 ('ordering', 'A'),
123 ('ordering', 'b'),
123 ('ordering', 'b'),
124 ('ordering', 'C'),
124 ('ordering', 'C'),
125 (u'Γ₯ b', u'Γ§ d'),
125 (u'Γ₯ b', u'Γ§ d'),
126 ]
126 ]
127 hidden_dirs = ['.hidden', '__pycache__']
127 hidden_dirs = ['.hidden', '__pycache__']
128
128
129 # Don't include root dir.
129 # Don't include root dir.
130 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs[1:]])
130 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs[1:]])
131 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
131 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
132
132
133 @staticmethod
133 @staticmethod
134 def _blob_for_name(name):
134 def _blob_for_name(name):
135 return name.encode('utf-8') + b'\xFF'
135 return name.encode('utf-8') + b'\xFF'
136
136
137 @staticmethod
137 @staticmethod
138 def _txt_for_name(name):
138 def _txt_for_name(name):
139 return u'%s text file' % name
139 return u'%s text file' % name
140
140
141 def to_os_path(self, api_path):
141 def to_os_path(self, api_path):
142 return to_os_path(api_path, root=self.notebook_dir.name)
142 return to_os_path(api_path, root=self.notebook_dir.name)
143
143
144 def make_dir(self, api_path):
144 def make_dir(self, api_path):
145 """Create a directory at api_path"""
145 """Create a directory at api_path"""
146 os_path = self.to_os_path(api_path)
146 os_path = self.to_os_path(api_path)
147 try:
147 try:
148 os.makedirs(os_path)
148 os.makedirs(os_path)
149 except OSError:
149 except OSError:
150 print("Directory already exists: %r" % os_path)
150 print("Directory already exists: %r" % os_path)
151
151
152 def make_txt(self, api_path, txt):
152 def make_txt(self, api_path, txt):
153 """Make a text file at a given api_path"""
153 """Make a text file at a given api_path"""
154 os_path = self.to_os_path(api_path)
154 os_path = self.to_os_path(api_path)
155 with io.open(os_path, 'w', encoding='utf-8') as f:
155 with io.open(os_path, 'w', encoding='utf-8') as f:
156 f.write(txt)
156 f.write(txt)
157
157
158 def make_blob(self, api_path, blob):
158 def make_blob(self, api_path, blob):
159 """Make a binary file at a given api_path"""
159 """Make a binary file at a given api_path"""
160 os_path = self.to_os_path(api_path)
160 os_path = self.to_os_path(api_path)
161 with io.open(os_path, 'wb') as f:
161 with io.open(os_path, 'wb') as f:
162 f.write(blob)
162 f.write(blob)
163
163
164 def make_nb(self, api_path, nb):
164 def make_nb(self, api_path, nb):
165 """Make a notebook file at a given api_path"""
165 """Make a notebook file at a given api_path"""
166 os_path = self.to_os_path(api_path)
166 os_path = self.to_os_path(api_path)
167
167
168 with io.open(os_path, 'w', encoding='utf-8') as f:
168 with io.open(os_path, 'w', encoding='utf-8') as f:
169 write(nb, f, version=4)
169 write(nb, f, version=4)
170
170
171 def delete_dir(self, api_path):
171 def delete_dir(self, api_path):
172 """Delete a directory at api_path, removing any contents."""
172 """Delete a directory at api_path, removing any contents."""
173 os_path = self.to_os_path(api_path)
173 os_path = self.to_os_path(api_path)
174 shutil.rmtree(os_path, ignore_errors=True)
174 shutil.rmtree(os_path, ignore_errors=True)
175
175
176 def delete_file(self, api_path):
176 def delete_file(self, api_path):
177 """Delete a file at the given path if it exists."""
177 """Delete a file at the given path if it exists."""
178 if self.isfile(api_path):
178 if self.isfile(api_path):
179 os.unlink(self.to_os_path(api_path))
179 os.unlink(self.to_os_path(api_path))
180
180
181 def isfile(self, api_path):
181 def isfile(self, api_path):
182 return os.path.isfile(self.to_os_path(api_path))
182 return os.path.isfile(self.to_os_path(api_path))
183
183
184 def isdir(self, api_path):
184 def isdir(self, api_path):
185 return os.path.isdir(self.to_os_path(api_path))
185 return os.path.isdir(self.to_os_path(api_path))
186
186
187 def setUp(self):
187 def setUp(self):
188
188
189 for d in (self.dirs + self.hidden_dirs):
189 for d in (self.dirs + self.hidden_dirs):
190 self.make_dir(d)
190 self.make_dir(d)
191
191
192 for d, name in self.dirs_nbs:
192 for d, name in self.dirs_nbs:
193 # create a notebook
193 # create a notebook
194 nb = new_notebook()
194 nb = new_notebook()
195 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
195 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
196
196
197 # create a text file
197 # create a text file
198 txt = self._txt_for_name(name)
198 txt = self._txt_for_name(name)
199 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
199 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
200
200
201 # create a binary file
201 # create a binary file
202 blob = self._blob_for_name(name)
202 blob = self._blob_for_name(name)
203 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
203 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
204
204
205 self.api = API(self.base_url())
205 self.api = API(self.base_url())
206
206
207 def tearDown(self):
207 def tearDown(self):
208 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
208 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
209 self.delete_dir(dname)
209 self.delete_dir(dname)
210 self.delete_file('inroot.ipynb')
210 self.delete_file('inroot.ipynb')
211
211
212 def test_list_notebooks(self):
212 def test_list_notebooks(self):
213 nbs = notebooks_only(self.api.list().json())
213 nbs = notebooks_only(self.api.list().json())
214 self.assertEqual(len(nbs), 1)
214 self.assertEqual(len(nbs), 1)
215 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
215 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
216
216
217 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
217 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
218 self.assertEqual(len(nbs), 1)
218 self.assertEqual(len(nbs), 1)
219 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
219 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
220
220
221 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
221 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
222 self.assertEqual(len(nbs), 1)
222 self.assertEqual(len(nbs), 1)
223 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
223 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
224 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
224 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
225
225
226 nbs = notebooks_only(self.api.list('/foo/bar/').json())
226 nbs = notebooks_only(self.api.list('/foo/bar/').json())
227 self.assertEqual(len(nbs), 1)
227 self.assertEqual(len(nbs), 1)
228 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
228 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
229 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
229 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
230
230
231 nbs = notebooks_only(self.api.list('foo').json())
231 nbs = notebooks_only(self.api.list('foo').json())
232 self.assertEqual(len(nbs), 4)
232 self.assertEqual(len(nbs), 4)
233 nbnames = { normalize('NFC', n['name']) for n in nbs }
233 nbnames = { normalize('NFC', n['name']) for n in nbs }
234 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
234 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
235 expected = { normalize('NFC', name) for name in expected }
235 expected = { normalize('NFC', name) for name in expected }
236 self.assertEqual(nbnames, expected)
236 self.assertEqual(nbnames, expected)
237
237
238 nbs = notebooks_only(self.api.list('ordering').json())
238 nbs = notebooks_only(self.api.list('ordering').json())
239 nbnames = [n['name'] for n in nbs]
239 nbnames = [n['name'] for n in nbs]
240 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
240 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
241 self.assertEqual(nbnames, expected)
241 self.assertEqual(nbnames, expected)
242
242
243 def test_list_dirs(self):
243 def test_list_dirs(self):
244 dirs = dirs_only(self.api.list().json())
244 dirs = dirs_only(self.api.list().json())
245 dir_names = {normalize('NFC', d['name']) for d in dirs}
245 dir_names = {normalize('NFC', d['name']) for d in dirs}
246 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
246 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
247
247
248 def test_get_dir_no_content(self):
248 def test_get_dir_no_content(self):
249 for d in self.dirs:
249 for d in self.dirs:
250 model = self.api.read(d, content=False).json()
250 model = self.api.read(d, content=False).json()
251 self.assertEqual(model['path'], d)
251 self.assertEqual(model['path'], d)
252 self.assertEqual(model['type'], 'directory')
252 self.assertEqual(model['type'], 'directory')
253 self.assertIn('content', model)
253 self.assertIn('content', model)
254 self.assertEqual(model['content'], None)
254 self.assertEqual(model['content'], None)
255
255
256 def test_list_nonexistant_dir(self):
256 def test_list_nonexistant_dir(self):
257 with assert_http_error(404):
257 with assert_http_error(404):
258 self.api.list('nonexistant')
258 self.api.list('nonexistant')
259
259
260 def test_get_nb_contents(self):
260 def test_get_nb_contents(self):
261 for d, name in self.dirs_nbs:
261 for d, name in self.dirs_nbs:
262 path = url_path_join(d, name + '.ipynb')
262 path = url_path_join(d, name + '.ipynb')
263 nb = self.api.read(path).json()
263 nb = self.api.read(path).json()
264 self.assertEqual(nb['name'], u'%s.ipynb' % name)
264 self.assertEqual(nb['name'], u'%s.ipynb' % name)
265 self.assertEqual(nb['path'], path)
265 self.assertEqual(nb['path'], path)
266 self.assertEqual(nb['type'], 'notebook')
266 self.assertEqual(nb['type'], 'notebook')
267 self.assertIn('content', nb)
267 self.assertIn('content', nb)
268 self.assertEqual(nb['format'], 'json')
268 self.assertEqual(nb['format'], 'json')
269 self.assertIn('metadata', nb['content'])
269 self.assertIn('metadata', nb['content'])
270 self.assertIsInstance(nb['content']['metadata'], dict)
270 self.assertIsInstance(nb['content']['metadata'], dict)
271
271
272 def test_get_nb_no_content(self):
272 def test_get_nb_no_content(self):
273 for d, name in self.dirs_nbs:
273 for d, name in self.dirs_nbs:
274 path = url_path_join(d, name + '.ipynb')
274 path = url_path_join(d, name + '.ipynb')
275 nb = self.api.read(path, content=False).json()
275 nb = self.api.read(path, content=False).json()
276 self.assertEqual(nb['name'], u'%s.ipynb' % name)
276 self.assertEqual(nb['name'], u'%s.ipynb' % name)
277 self.assertEqual(nb['path'], path)
277 self.assertEqual(nb['path'], path)
278 self.assertEqual(nb['type'], 'notebook')
278 self.assertEqual(nb['type'], 'notebook')
279 self.assertIn('content', nb)
279 self.assertIn('content', nb)
280 self.assertEqual(nb['content'], None)
280 self.assertEqual(nb['content'], None)
281
281
282 def test_get_contents_no_such_file(self):
282 def test_get_contents_no_such_file(self):
283 # Name that doesn't exist - should be a 404
283 # Name that doesn't exist - should be a 404
284 with assert_http_error(404):
284 with assert_http_error(404):
285 self.api.read('foo/q.ipynb')
285 self.api.read('foo/q.ipynb')
286
286
287 def test_get_text_file_contents(self):
287 def test_get_text_file_contents(self):
288 for d, name in self.dirs_nbs:
288 for d, name in self.dirs_nbs:
289 path = url_path_join(d, name + '.txt')
289 path = url_path_join(d, name + '.txt')
290 model = self.api.read(path).json()
290 model = self.api.read(path).json()
291 self.assertEqual(model['name'], u'%s.txt' % name)
291 self.assertEqual(model['name'], u'%s.txt' % name)
292 self.assertEqual(model['path'], path)
292 self.assertEqual(model['path'], path)
293 self.assertIn('content', model)
293 self.assertIn('content', model)
294 self.assertEqual(model['format'], 'text')
294 self.assertEqual(model['format'], 'text')
295 self.assertEqual(model['type'], 'file')
295 self.assertEqual(model['type'], 'file')
296 self.assertEqual(model['content'], self._txt_for_name(name))
296 self.assertEqual(model['content'], self._txt_for_name(name))
297
297
298 # Name that doesn't exist - should be a 404
298 # Name that doesn't exist - should be a 404
299 with assert_http_error(404):
299 with assert_http_error(404):
300 self.api.read('foo/q.txt')
300 self.api.read('foo/q.txt')
301
301
302 # Specifying format=text should fail on a non-UTF-8 file
302 # Specifying format=text should fail on a non-UTF-8 file
303 with assert_http_error(400):
303 with assert_http_error(400):
304 self.api.read('foo/bar/baz.blob', type='file', format='text')
304 self.api.read('foo/bar/baz.blob', type='file', format='text')
305
305
306 def test_get_binary_file_contents(self):
306 def test_get_binary_file_contents(self):
307 for d, name in self.dirs_nbs:
307 for d, name in self.dirs_nbs:
308 path = url_path_join(d, name + '.blob')
308 path = url_path_join(d, name + '.blob')
309 model = self.api.read(path).json()
309 model = self.api.read(path).json()
310 self.assertEqual(model['name'], u'%s.blob' % name)
310 self.assertEqual(model['name'], u'%s.blob' % name)
311 self.assertEqual(model['path'], path)
311 self.assertEqual(model['path'], path)
312 self.assertIn('content', model)
312 self.assertIn('content', model)
313 self.assertEqual(model['format'], 'base64')
313 self.assertEqual(model['format'], 'base64')
314 self.assertEqual(model['type'], 'file')
314 self.assertEqual(model['type'], 'file')
315 self.assertEqual(
315 self.assertEqual(
316 base64.decodestring(model['content'].encode('ascii')),
316 base64.decodestring(model['content'].encode('ascii')),
317 self._blob_for_name(name),
317 self._blob_for_name(name),
318 )
318 )
319
319
320 # Name that doesn't exist - should be a 404
320 # Name that doesn't exist - should be a 404
321 with assert_http_error(404):
321 with assert_http_error(404):
322 self.api.read('foo/q.txt')
322 self.api.read('foo/q.txt')
323
323
324 def test_get_bad_type(self):
324 def test_get_bad_type(self):
325 with assert_http_error(400):
325 with assert_http_error(400):
326 self.api.read(u'unicodΓ©', type='file') # this is a directory
326 self.api.read(u'unicodΓ©', type='file') # this is a directory
327
327
328 with assert_http_error(400):
328 with assert_http_error(400):
329 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
329 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
330
330
331 def _check_created(self, resp, path, type='notebook'):
331 def _check_created(self, resp, path, type='notebook'):
332 self.assertEqual(resp.status_code, 201)
332 self.assertEqual(resp.status_code, 201)
333 location_header = py3compat.str_to_unicode(resp.headers['Location'])
333 location_header = py3compat.str_to_unicode(resp.headers['Location'])
334 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
334 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
335 rjson = resp.json()
335 rjson = resp.json()
336 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
336 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
337 self.assertEqual(rjson['path'], path)
337 self.assertEqual(rjson['path'], path)
338 self.assertEqual(rjson['type'], type)
338 self.assertEqual(rjson['type'], type)
339 isright = self.isdir if type == 'directory' else self.isfile
339 isright = self.isdir if type == 'directory' else self.isfile
340 assert isright(path)
340 assert isright(path)
341
341
342 def test_create_untitled(self):
342 def test_create_untitled(self):
343 resp = self.api.create_untitled(path=u'Γ₯ b')
343 resp = self.api.create_untitled(path=u'Γ₯ b')
344 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
344 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
345
345
346 # Second time
346 # Second time
347 resp = self.api.create_untitled(path=u'Γ₯ b')
347 resp = self.api.create_untitled(path=u'Γ₯ b')
348 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
348 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
349
349
350 # And two directories down
350 # And two directories down
351 resp = self.api.create_untitled(path='foo/bar')
351 resp = self.api.create_untitled(path='foo/bar')
352 self._check_created(resp, 'foo/bar/Untitled.ipynb')
352 self._check_created(resp, 'foo/bar/Untitled.ipynb')
353
353
354 def test_create_untitled_txt(self):
354 def test_create_untitled_txt(self):
355 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
355 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
356 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
356 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
357
357
358 resp = self.api.read(path='foo/bar/untitled.txt')
358 resp = self.api.read(path='foo/bar/untitled.txt')
359 model = resp.json()
359 model = resp.json()
360 self.assertEqual(model['type'], 'file')
360 self.assertEqual(model['type'], 'file')
361 self.assertEqual(model['format'], 'text')
361 self.assertEqual(model['format'], 'text')
362 self.assertEqual(model['content'], '')
362 self.assertEqual(model['content'], '')
363
363
364 def test_upload(self):
364 def test_upload(self):
365 nb = new_notebook()
365 nb = new_notebook()
366 nbmodel = {'content': nb, 'type': 'notebook'}
366 nbmodel = {'content': nb, 'type': 'notebook'}
367 path = u'Γ₯ b/Upload tΓ©st.ipynb'
367 path = u'Γ₯ b/Upload tΓ©st.ipynb'
368 resp = self.api.upload(path, body=json.dumps(nbmodel))
368 resp = self.api.upload(path, body=json.dumps(nbmodel))
369 self._check_created(resp, path)
369 self._check_created(resp, path)
370
370
371 def test_mkdir_untitled(self):
371 def test_mkdir_untitled(self):
372 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
372 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
373 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
373 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
374
374
375 # Second time
375 # Second time
376 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
376 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
377 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
377 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
378
378
379 # And two directories down
379 # And two directories down
380 resp = self.api.mkdir_untitled(path='foo/bar')
380 resp = self.api.mkdir_untitled(path='foo/bar')
381 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
381 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
382
382
383 def test_mkdir(self):
383 def test_mkdir(self):
384 path = u'Γ₯ b/New βˆ‚ir'
384 path = u'Γ₯ b/New βˆ‚ir'
385 resp = self.api.mkdir(path)
385 resp = self.api.mkdir(path)
386 self._check_created(resp, path, type='directory')
386 self._check_created(resp, path, type='directory')
387
387
388 def test_mkdir_hidden_400(self):
388 def test_mkdir_hidden_400(self):
389 with assert_http_error(400):
389 with assert_http_error(400):
390 resp = self.api.mkdir(u'Γ₯ b/.hidden')
390 resp = self.api.mkdir(u'Γ₯ b/.hidden')
391
391
392 def test_upload_txt(self):
392 def test_upload_txt(self):
393 body = u'ΓΌnicode tΓ©xt'
393 body = u'ΓΌnicode tΓ©xt'
394 model = {
394 model = {
395 'content' : body,
395 'content' : body,
396 'format' : 'text',
396 'format' : 'text',
397 'type' : 'file',
397 'type' : 'file',
398 }
398 }
399 path = u'Γ₯ b/Upload tΓ©st.txt'
399 path = u'Γ₯ b/Upload tΓ©st.txt'
400 resp = self.api.upload(path, body=json.dumps(model))
400 resp = self.api.upload(path, body=json.dumps(model))
401
401
402 # check roundtrip
402 # check roundtrip
403 resp = self.api.read(path)
403 resp = self.api.read(path)
404 model = resp.json()
404 model = resp.json()
405 self.assertEqual(model['type'], 'file')
405 self.assertEqual(model['type'], 'file')
406 self.assertEqual(model['format'], 'text')
406 self.assertEqual(model['format'], 'text')
407 self.assertEqual(model['content'], body)
407 self.assertEqual(model['content'], body)
408
408
409 def test_upload_b64(self):
409 def test_upload_b64(self):
410 body = b'\xFFblob'
410 body = b'\xFFblob'
411 b64body = base64.encodestring(body).decode('ascii')
411 b64body = base64.encodestring(body).decode('ascii')
412 model = {
412 model = {
413 'content' : b64body,
413 'content' : b64body,
414 'format' : 'base64',
414 'format' : 'base64',
415 'type' : 'file',
415 'type' : 'file',
416 }
416 }
417 path = u'Γ₯ b/Upload tΓ©st.blob'
417 path = u'Γ₯ b/Upload tΓ©st.blob'
418 resp = self.api.upload(path, body=json.dumps(model))
418 resp = self.api.upload(path, body=json.dumps(model))
419
419
420 # check roundtrip
420 # check roundtrip
421 resp = self.api.read(path)
421 resp = self.api.read(path)
422 model = resp.json()
422 model = resp.json()
423 self.assertEqual(model['type'], 'file')
423 self.assertEqual(model['type'], 'file')
424 self.assertEqual(model['path'], path)
424 self.assertEqual(model['path'], path)
425 self.assertEqual(model['format'], 'base64')
425 self.assertEqual(model['format'], 'base64')
426 decoded = base64.decodestring(model['content'].encode('ascii'))
426 decoded = base64.decodestring(model['content'].encode('ascii'))
427 self.assertEqual(decoded, body)
427 self.assertEqual(decoded, body)
428
428
429 def test_upload_v2(self):
429 def test_upload_v2(self):
430 nb = v2.new_notebook()
430 nb = v2.new_notebook()
431 ws = v2.new_worksheet()
431 ws = v2.new_worksheet()
432 nb.worksheets.append(ws)
432 nb.worksheets.append(ws)
433 ws.cells.append(v2.new_code_cell(input='print("hi")'))
433 ws.cells.append(v2.new_code_cell(input='print("hi")'))
434 nbmodel = {'content': nb, 'type': 'notebook'}
434 nbmodel = {'content': nb, 'type': 'notebook'}
435 path = u'Γ₯ b/Upload tΓ©st.ipynb'
435 path = u'Γ₯ b/Upload tΓ©st.ipynb'
436 resp = self.api.upload(path, body=json.dumps(nbmodel))
436 resp = self.api.upload(path, body=json.dumps(nbmodel))
437 self._check_created(resp, path)
437 self._check_created(resp, path)
438 resp = self.api.read(path)
438 resp = self.api.read(path)
439 data = resp.json()
439 data = resp.json()
440 self.assertEqual(data['content']['nbformat'], 4)
440 self.assertEqual(data['content']['nbformat'], 4)
441
441
442 def test_copy(self):
442 def test_copy(self):
443 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
443 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
444 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
444 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
445
445
446 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
446 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
447 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
447 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
448
448
449 def test_copy_copy(self):
449 def test_copy_copy(self):
450 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
450 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
451 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
451 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
452
452
453 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
453 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
454 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
454 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
455
455
456 def test_copy_path(self):
456 def test_copy_path(self):
457 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
457 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
458 self._check_created(resp, u'Γ₯ b/a.ipynb')
458 self._check_created(resp, u'Γ₯ b/a.ipynb')
459
459
460 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
460 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
461 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
461 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
462
462
463 def test_copy_put_400(self):
463 def test_copy_put_400(self):
464 with assert_http_error(400):
464 with assert_http_error(400):
465 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
465 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
466
466
467 def test_copy_dir_400(self):
467 def test_copy_dir_400(self):
468 # can't copy directories
468 # can't copy directories
469 with assert_http_error(400):
469 with assert_http_error(400):
470 resp = self.api.copy(u'Γ₯ b', u'foo')
470 resp = self.api.copy(u'Γ₯ b', u'foo')
471
471
472 def test_delete(self):
472 def test_delete(self):
473 for d, name in self.dirs_nbs:
473 for d, name in self.dirs_nbs:
474 print('%r, %r' % (d, name))
474 print('%r, %r' % (d, name))
475 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
475 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
476 self.assertEqual(resp.status_code, 204)
476 self.assertEqual(resp.status_code, 204)
477
477
478 for d in self.dirs + ['/']:
478 for d in self.dirs + ['/']:
479 nbs = notebooks_only(self.api.list(d).json())
479 nbs = notebooks_only(self.api.list(d).json())
480 print('------')
480 print('------')
481 print(d)
481 print(d)
482 print(nbs)
482 print(nbs)
483 self.assertEqual(nbs, [])
483 self.assertEqual(nbs, [])
484
484
485 def test_delete_dirs(self):
485 def test_delete_dirs(self):
486 # depth-first delete everything, so we don't try to delete empty directories
486 # depth-first delete everything, so we don't try to delete empty directories
487 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
487 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
488 listing = self.api.list(name).json()['content']
488 listing = self.api.list(name).json()['content']
489 for model in listing:
489 for model in listing:
490 self.api.delete(model['path'])
490 self.api.delete(model['path'])
491 listing = self.api.list('/').json()['content']
491 listing = self.api.list('/').json()['content']
492 self.assertEqual(listing, [])
492 self.assertEqual(listing, [])
493
493
494 def test_delete_non_empty_dir(self):
494 def test_delete_non_empty_dir(self):
495 """delete non-empty dir raises 400"""
495 """delete non-empty dir raises 400"""
496 with assert_http_error(400):
496 with assert_http_error(400):
497 self.api.delete(u'Γ₯ b')
497 self.api.delete(u'Γ₯ b')
498
498
499 def test_rename(self):
499 def test_rename(self):
500 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
500 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
501 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
501 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
502 self.assertEqual(resp.json()['name'], 'z.ipynb')
502 self.assertEqual(resp.json()['name'], 'z.ipynb')
503 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
503 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
504 assert self.isfile('foo/z.ipynb')
504 assert self.isfile('foo/z.ipynb')
505
505
506 nbs = notebooks_only(self.api.list('foo').json())
506 nbs = notebooks_only(self.api.list('foo').json())
507 nbnames = set(n['name'] for n in nbs)
507 nbnames = set(n['name'] for n in nbs)
508 self.assertIn('z.ipynb', nbnames)
508 self.assertIn('z.ipynb', nbnames)
509 self.assertNotIn('a.ipynb', nbnames)
509 self.assertNotIn('a.ipynb', nbnames)
510
510
511 def test_rename_preserves_checkpoints(self):
511 def test_checkpoints_follow_file(self):
512
512
513 # Read initial file state
513 # Read initial file state
514 orig = self.api.read('foo/a.ipynb')
514 orig = self.api.read('foo/a.ipynb')
515
515
516 # Create a checkpoint of initial state
516 # Create a checkpoint of initial state
517 r = self.api.new_checkpoint('foo/a.ipynb')
517 r = self.api.new_checkpoint('foo/a.ipynb')
518 cp1 = r.json()
518 cp1 = r.json()
519
519
520 # Modify file and save
520 # Modify file and save
521 nbcontent = json.loads(orig.text)['content']
521 nbcontent = json.loads(orig.text)['content']
522 nb = from_dict(nbcontent)
522 nb = from_dict(nbcontent)
523 hcell = new_markdown_cell('Created by test')
523 hcell = new_markdown_cell('Created by test')
524 nb.cells.append(hcell)
524 nb.cells.append(hcell)
525 nbmodel = {'content': nb, 'type': 'notebook'}
525 nbmodel = {'content': nb, 'type': 'notebook'}
526 self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
526 self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
527
527
528 # Rename the file.
528 # Rename the file.
529 self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
529 self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
530
530
531 # Looking for checkpoints in the old location should yield no results.
531 # Looking for checkpoints in the old location should yield no results.
532 self.assertEqual(self.api.get_checkpoints('foo/a.ipynb').json(), [])
532 self.assertEqual(self.api.get_checkpoints('foo/a.ipynb').json(), [])
533
533
534 # Looking for checkpoints in the new location should work.
534 # Looking for checkpoints in the new location should work.
535 cps = self.api.get_checkpoints('foo/z.ipynb').json()
535 cps = self.api.get_checkpoints('foo/z.ipynb').json()
536 self.assertEqual(len(cps), 1)
536 self.assertEqual(cps, [cp1])
537 self.assertEqual(cps[0], cp1)
537
538 # Delete the file. The checkpoint should be deleted as well.
539 self.api.delete('foo/z.ipynb')
540 cps = self.api.get_checkpoints('foo/z.ipynb').json()
541 self.assertEqual(cps, [])
538
542
539 def test_rename_existing(self):
543 def test_rename_existing(self):
540 with assert_http_error(409):
544 with assert_http_error(409):
541 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
545 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
542
546
543 def test_save(self):
547 def test_save(self):
544 resp = self.api.read('foo/a.ipynb')
548 resp = self.api.read('foo/a.ipynb')
545 nbcontent = json.loads(resp.text)['content']
549 nbcontent = json.loads(resp.text)['content']
546 nb = from_dict(nbcontent)
550 nb = from_dict(nbcontent)
547 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
551 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
548
552
549 nbmodel = {'content': nb, 'type': 'notebook'}
553 nbmodel = {'content': nb, 'type': 'notebook'}
550 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
554 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
551
555
552 nbcontent = self.api.read('foo/a.ipynb').json()['content']
556 nbcontent = self.api.read('foo/a.ipynb').json()['content']
553 newnb = from_dict(nbcontent)
557 newnb = from_dict(nbcontent)
554 self.assertEqual(newnb.cells[0].source,
558 self.assertEqual(newnb.cells[0].source,
555 u'Created by test Β³')
559 u'Created by test Β³')
556
560
557 def test_checkpoints(self):
561 def test_checkpoints(self):
558 resp = self.api.read('foo/a.ipynb')
562 resp = self.api.read('foo/a.ipynb')
559 r = self.api.new_checkpoint('foo/a.ipynb')
563 r = self.api.new_checkpoint('foo/a.ipynb')
560 self.assertEqual(r.status_code, 201)
564 self.assertEqual(r.status_code, 201)
561 cp1 = r.json()
565 cp1 = r.json()
562 self.assertEqual(set(cp1), {'id', 'last_modified'})
566 self.assertEqual(set(cp1), {'id', 'last_modified'})
563 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
567 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
564
568
565 # Modify it
569 # Modify it
566 nbcontent = json.loads(resp.text)['content']
570 nbcontent = json.loads(resp.text)['content']
567 nb = from_dict(nbcontent)
571 nb = from_dict(nbcontent)
568 hcell = new_markdown_cell('Created by test')
572 hcell = new_markdown_cell('Created by test')
569 nb.cells.append(hcell)
573 nb.cells.append(hcell)
570 # Save
574 # Save
571 nbmodel= {'content': nb, 'type': 'notebook'}
575 nbmodel= {'content': nb, 'type': 'notebook'}
572 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
576 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
573
577
574 # List checkpoints
578 # List checkpoints
575 cps = self.api.get_checkpoints('foo/a.ipynb').json()
579 cps = self.api.get_checkpoints('foo/a.ipynb').json()
576 self.assertEqual(cps, [cp1])
580 self.assertEqual(cps, [cp1])
577
581
578 nbcontent = self.api.read('foo/a.ipynb').json()['content']
582 nbcontent = self.api.read('foo/a.ipynb').json()['content']
579 nb = from_dict(nbcontent)
583 nb = from_dict(nbcontent)
580 self.assertEqual(nb.cells[0].source, 'Created by test')
584 self.assertEqual(nb.cells[0].source, 'Created by test')
581
585
582 # Restore cp1
586 # Restore cp1
583 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
587 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
584 self.assertEqual(r.status_code, 204)
588 self.assertEqual(r.status_code, 204)
585 nbcontent = self.api.read('foo/a.ipynb').json()['content']
589 nbcontent = self.api.read('foo/a.ipynb').json()['content']
586 nb = from_dict(nbcontent)
590 nb = from_dict(nbcontent)
587 self.assertEqual(nb.cells, [])
591 self.assertEqual(nb.cells, [])
588
592
589 # Delete cp1
593 # Delete cp1
590 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
594 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
591 self.assertEqual(r.status_code, 204)
595 self.assertEqual(r.status_code, 204)
592 cps = self.api.get_checkpoints('foo/a.ipynb').json()
596 cps = self.api.get_checkpoints('foo/a.ipynb').json()
593 self.assertEqual(cps, [])
597 self.assertEqual(cps, [])
594
598
595 def test_file_checkpoints(self):
599 def test_file_checkpoints(self):
596 """
600 """
597 Test checkpointing of non-notebook files.
601 Test checkpointing of non-notebook files.
598 """
602 """
599 filename = 'foo/a.txt'
603 filename = 'foo/a.txt'
600 resp = self.api.read(filename)
604 resp = self.api.read(filename)
601 orig_content = json.loads(resp.text)['content']
605 orig_content = json.loads(resp.text)['content']
602
606
603 # Create a checkpoint.
607 # Create a checkpoint.
604 r = self.api.new_checkpoint(filename)
608 r = self.api.new_checkpoint(filename)
605 self.assertEqual(r.status_code, 201)
609 self.assertEqual(r.status_code, 201)
606 cp1 = r.json()
610 cp1 = r.json()
607 self.assertEqual(set(cp1), {'id', 'last_modified'})
611 self.assertEqual(set(cp1), {'id', 'last_modified'})
608 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
612 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
609
613
610 # Modify the file and save.
614 # Modify the file and save.
611 new_content = orig_content + '\nsecond line'
615 new_content = orig_content + '\nsecond line'
612 model = {
616 model = {
613 'content': new_content,
617 'content': new_content,
614 'type': 'file',
618 'type': 'file',
615 'format': 'text',
619 'format': 'text',
616 }
620 }
617 resp = self.api.save(filename, body=json.dumps(model))
621 resp = self.api.save(filename, body=json.dumps(model))
618
622
619 # List checkpoints
623 # List checkpoints
620 cps = self.api.get_checkpoints(filename).json()
624 cps = self.api.get_checkpoints(filename).json()
621 self.assertEqual(cps, [cp1])
625 self.assertEqual(cps, [cp1])
622
626
623 content = self.api.read(filename).json()['content']
627 content = self.api.read(filename).json()['content']
624 self.assertEqual(content, new_content)
628 self.assertEqual(content, new_content)
625
629
626 # Restore cp1
630 # Restore cp1
627 r = self.api.restore_checkpoint(filename, cp1['id'])
631 r = self.api.restore_checkpoint(filename, cp1['id'])
628 self.assertEqual(r.status_code, 204)
632 self.assertEqual(r.status_code, 204)
629 restored_content = self.api.read(filename).json()['content']
633 restored_content = self.api.read(filename).json()['content']
630 self.assertEqual(restored_content, orig_content)
634 self.assertEqual(restored_content, orig_content)
631
635
632 # Delete cp1
636 # Delete cp1
633 r = self.api.delete_checkpoint(filename, cp1['id'])
637 r = self.api.delete_checkpoint(filename, cp1['id'])
634 self.assertEqual(r.status_code, 204)
638 self.assertEqual(r.status_code, 204)
635 cps = self.api.get_checkpoints(filename).json()
639 cps = self.api.get_checkpoints(filename).json()
636 self.assertEqual(cps, [])
640 self.assertEqual(cps, [])
637
641
638 @contextmanager
642 @contextmanager
639 def patch_cp_root(self, dirname):
643 def patch_cp_root(self, dirname):
640 """
644 """
641 Temporarily patch the root dir of our checkpoint manager.
645 Temporarily patch the root dir of our checkpoint manager.
642 """
646 """
643 cpm = self.notebook.contents_manager.checkpoints
647 cpm = self.notebook.contents_manager.checkpoints
644 old_dirname = cpm.root_dir
648 old_dirname = cpm.root_dir
645 cpm.root_dir = dirname
649 cpm.root_dir = dirname
646 try:
650 try:
647 yield
651 yield
648 finally:
652 finally:
649 cpm.root_dir = old_dirname
653 cpm.root_dir = old_dirname
650
654
651 def test_checkpoints_separate_root(self):
655 def test_checkpoints_separate_root(self):
652 """
656 """
653 Test that FileCheckpoints functions correctly even when it's
657 Test that FileCheckpoints functions correctly even when it's
654 using a different root dir from FileContentsManager. This also keeps
658 using a different root dir from FileContentsManager. This also keeps
655 the implementation honest for use with ContentsManagers that don't map
659 the implementation honest for use with ContentsManagers that don't map
656 models to the filesystem
660 models to the filesystem
657
661
658 Override this method to a no-op when testing other managers.
662 Override this method to a no-op when testing other managers.
659 """
663 """
660 with TemporaryDirectory() as td:
664 with TemporaryDirectory() as td:
661 with self.patch_cp_root(td):
665 with self.patch_cp_root(td):
662 self.test_checkpoints()
666 self.test_checkpoints()
663
667
664 with TemporaryDirectory() as td:
668 with TemporaryDirectory() as td:
665 with self.patch_cp_root(td):
669 with self.patch_cp_root(td):
666 self.test_file_checkpoints()
670 self.test_file_checkpoints()
667
671
668
672
669 class GenericFileCheckpointsAPITest(APITest):
673 class GenericFileCheckpointsAPITest(APITest):
670 """
674 """
671 Run the tests from APITest with GenericFileCheckpoints.
675 Run the tests from APITest with GenericFileCheckpoints.
672 """
676 """
673 config = Config()
677 config = Config()
674 config.FileContentsManager.checkpoints_class = GenericFileCheckpoints
678 config.FileContentsManager.checkpoints_class = GenericFileCheckpoints
675
679
676 def test_config_did_something(self):
680 def test_config_did_something(self):
677
681
678 self.assertIsInstance(
682 self.assertIsInstance(
679 self.notebook.contents_manager.checkpoints,
683 self.notebook.contents_manager.checkpoints,
680 GenericFileCheckpoints,
684 GenericFileCheckpoints,
681 )
685 )
682
686
683
687
General Comments 0
You need to be logged in to leave comments. Login now