##// END OF EJS Templates
Backport PR #8102: TEST: Add test for renaming files with checkpoint....
Min RK -
Show More
@@ -1,655 +1,687 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 from contextlib import contextmanager
5 from contextlib import contextmanager
6 import io
6 import io
7 import json
7 import json
8 import os
8 import os
9 import shutil
9 import shutil
10 from unicodedata import normalize
10 from unicodedata import normalize
11
11
12 pjoin = os.path.join
12 pjoin = os.path.join
13
13
14 import requests
14 import requests
15
15
16 from ..filecheckpoints import GenericFileCheckpoints
16 from ..filecheckpoints import GenericFileCheckpoints
17
17
18 from IPython.config import Config
18 from IPython.config import Config
19 from IPython.html.utils import url_path_join, url_escape, to_os_path
19 from IPython.html.utils import url_path_join, url_escape, to_os_path
20 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
20 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
21 from IPython.nbformat import read, write, from_dict
21 from IPython.nbformat import read, write, from_dict
22 from IPython.nbformat.v4 import (
22 from IPython.nbformat.v4 import (
23 new_notebook, new_markdown_cell,
23 new_notebook, new_markdown_cell,
24 )
24 )
25 from IPython.nbformat import v2
25 from IPython.nbformat import v2
26 from IPython.utils import py3compat
26 from IPython.utils import py3compat
27 from IPython.utils.data import uniq_stable
27 from IPython.utils.data import uniq_stable
28 from IPython.utils.tempdir import TemporaryDirectory
28 from IPython.utils.tempdir import TemporaryDirectory
29
29
30
30
31 def notebooks_only(dir_model):
31 def notebooks_only(dir_model):
32 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
32 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
33
33
34 def dirs_only(dir_model):
34 def dirs_only(dir_model):
35 return [x for x in dir_model['content'] if x['type']=='directory']
35 return [x for x in dir_model['content'] if x['type']=='directory']
36
36
37
37
38 class API(object):
38 class API(object):
39 """Wrapper for contents API calls."""
39 """Wrapper for contents API calls."""
40 def __init__(self, base_url):
40 def __init__(self, base_url):
41 self.base_url = base_url
41 self.base_url = base_url
42
42
43 def _req(self, verb, path, body=None, params=None):
43 def _req(self, verb, path, body=None, params=None):
44 response = requests.request(verb,
44 response = requests.request(verb,
45 url_path_join(self.base_url, 'api/contents', path),
45 url_path_join(self.base_url, 'api/contents', path),
46 data=body, params=params,
46 data=body, params=params,
47 )
47 )
48 response.raise_for_status()
48 response.raise_for_status()
49 return response
49 return response
50
50
51 def list(self, path='/'):
51 def list(self, path='/'):
52 return self._req('GET', path)
52 return self._req('GET', path)
53
53
54 def read(self, path, type=None, format=None, content=None):
54 def read(self, path, type=None, format=None, content=None):
55 params = {}
55 params = {}
56 if type is not None:
56 if type is not None:
57 params['type'] = type
57 params['type'] = type
58 if format is not None:
58 if format is not None:
59 params['format'] = format
59 params['format'] = format
60 if content == False:
60 if content == False:
61 params['content'] = '0'
61 params['content'] = '0'
62 return self._req('GET', path, params=params)
62 return self._req('GET', path, params=params)
63
63
64 def create_untitled(self, path='/', ext='.ipynb'):
64 def create_untitled(self, path='/', ext='.ipynb'):
65 body = None
65 body = None
66 if ext:
66 if ext:
67 body = json.dumps({'ext': ext})
67 body = json.dumps({'ext': ext})
68 return self._req('POST', path, body)
68 return self._req('POST', path, body)
69
69
70 def mkdir_untitled(self, path='/'):
70 def mkdir_untitled(self, path='/'):
71 return self._req('POST', path, json.dumps({'type': 'directory'}))
71 return self._req('POST', path, json.dumps({'type': 'directory'}))
72
72
73 def copy(self, copy_from, path='/'):
73 def copy(self, copy_from, path='/'):
74 body = json.dumps({'copy_from':copy_from})
74 body = json.dumps({'copy_from':copy_from})
75 return self._req('POST', path, body)
75 return self._req('POST', path, body)
76
76
77 def create(self, path='/'):
77 def create(self, path='/'):
78 return self._req('PUT', path)
78 return self._req('PUT', path)
79
79
80 def upload(self, path, body):
80 def upload(self, path, body):
81 return self._req('PUT', path, body)
81 return self._req('PUT', path, body)
82
82
83 def mkdir(self, path='/'):
83 def mkdir(self, path='/'):
84 return self._req('PUT', path, json.dumps({'type': 'directory'}))
84 return self._req('PUT', path, json.dumps({'type': 'directory'}))
85
85
86 def copy_put(self, copy_from, path='/'):
86 def copy_put(self, copy_from, path='/'):
87 body = json.dumps({'copy_from':copy_from})
87 body = json.dumps({'copy_from':copy_from})
88 return self._req('PUT', path, body)
88 return self._req('PUT', path, body)
89
89
90 def save(self, path, body):
90 def save(self, path, body):
91 return self._req('PUT', path, body)
91 return self._req('PUT', path, body)
92
92
93 def delete(self, path='/'):
93 def delete(self, path='/'):
94 return self._req('DELETE', path)
94 return self._req('DELETE', path)
95
95
96 def rename(self, path, new_path):
96 def rename(self, path, new_path):
97 body = json.dumps({'path': new_path})
97 body = json.dumps({'path': new_path})
98 return self._req('PATCH', path, body)
98 return self._req('PATCH', path, body)
99
99
100 def get_checkpoints(self, path):
100 def get_checkpoints(self, path):
101 return self._req('GET', url_path_join(path, 'checkpoints'))
101 return self._req('GET', url_path_join(path, 'checkpoints'))
102
102
103 def new_checkpoint(self, path):
103 def new_checkpoint(self, path):
104 return self._req('POST', url_path_join(path, 'checkpoints'))
104 return self._req('POST', url_path_join(path, 'checkpoints'))
105
105
106 def restore_checkpoint(self, path, checkpoint_id):
106 def restore_checkpoint(self, path, checkpoint_id):
107 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
107 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
108
108
109 def delete_checkpoint(self, path, checkpoint_id):
109 def delete_checkpoint(self, path, checkpoint_id):
110 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
110 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
111
111
112 class APITest(NotebookTestBase):
112 class APITest(NotebookTestBase):
113 """Test the kernels web service API"""
113 """Test the kernels web service API"""
114 dirs_nbs = [('', 'inroot'),
114 dirs_nbs = [('', 'inroot'),
115 ('Directory with spaces in', 'inspace'),
115 ('Directory with spaces in', 'inspace'),
116 (u'unicodΓ©', 'innonascii'),
116 (u'unicodΓ©', 'innonascii'),
117 ('foo', 'a'),
117 ('foo', 'a'),
118 ('foo', 'b'),
118 ('foo', 'b'),
119 ('foo', 'name with spaces'),
119 ('foo', 'name with spaces'),
120 ('foo', u'unicodΓ©'),
120 ('foo', u'unicodΓ©'),
121 ('foo/bar', 'baz'),
121 ('foo/bar', 'baz'),
122 ('ordering', 'A'),
122 ('ordering', 'A'),
123 ('ordering', 'b'),
123 ('ordering', 'b'),
124 ('ordering', 'C'),
124 ('ordering', 'C'),
125 (u'Γ₯ b', u'Γ§ d'),
125 (u'Γ₯ b', u'Γ§ d'),
126 ]
126 ]
127 hidden_dirs = ['.hidden', '__pycache__']
127 hidden_dirs = ['.hidden', '__pycache__']
128
128
129 # Don't include root dir.
129 # Don't include root dir.
130 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs[1:]])
130 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs[1:]])
131 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
131 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
132
132
133 @staticmethod
133 @staticmethod
134 def _blob_for_name(name):
134 def _blob_for_name(name):
135 return name.encode('utf-8') + b'\xFF'
135 return name.encode('utf-8') + b'\xFF'
136
136
137 @staticmethod
137 @staticmethod
138 def _txt_for_name(name):
138 def _txt_for_name(name):
139 return u'%s text file' % name
139 return u'%s text file' % name
140
140
141 def to_os_path(self, api_path):
141 def to_os_path(self, api_path):
142 return to_os_path(api_path, root=self.notebook_dir.name)
142 return to_os_path(api_path, root=self.notebook_dir.name)
143
143
144 def make_dir(self, api_path):
144 def make_dir(self, api_path):
145 """Create a directory at api_path"""
145 """Create a directory at api_path"""
146 os_path = self.to_os_path(api_path)
146 os_path = self.to_os_path(api_path)
147 try:
147 try:
148 os.makedirs(os_path)
148 os.makedirs(os_path)
149 except OSError:
149 except OSError:
150 print("Directory already exists: %r" % os_path)
150 print("Directory already exists: %r" % os_path)
151
151
152 def make_txt(self, api_path, txt):
152 def make_txt(self, api_path, txt):
153 """Make a text file at a given api_path"""
153 """Make a text file at a given api_path"""
154 os_path = self.to_os_path(api_path)
154 os_path = self.to_os_path(api_path)
155 with io.open(os_path, 'w', encoding='utf-8') as f:
155 with io.open(os_path, 'w', encoding='utf-8') as f:
156 f.write(txt)
156 f.write(txt)
157
157
158 def make_blob(self, api_path, blob):
158 def make_blob(self, api_path, blob):
159 """Make a binary file at a given api_path"""
159 """Make a binary file at a given api_path"""
160 os_path = self.to_os_path(api_path)
160 os_path = self.to_os_path(api_path)
161 with io.open(os_path, 'wb') as f:
161 with io.open(os_path, 'wb') as f:
162 f.write(blob)
162 f.write(blob)
163
163
164 def make_nb(self, api_path, nb):
164 def make_nb(self, api_path, nb):
165 """Make a notebook file at a given api_path"""
165 """Make a notebook file at a given api_path"""
166 os_path = self.to_os_path(api_path)
166 os_path = self.to_os_path(api_path)
167
167
168 with io.open(os_path, 'w', encoding='utf-8') as f:
168 with io.open(os_path, 'w', encoding='utf-8') as f:
169 write(nb, f, version=4)
169 write(nb, f, version=4)
170
170
171 def delete_dir(self, api_path):
171 def delete_dir(self, api_path):
172 """Delete a directory at api_path, removing any contents."""
172 """Delete a directory at api_path, removing any contents."""
173 os_path = self.to_os_path(api_path)
173 os_path = self.to_os_path(api_path)
174 shutil.rmtree(os_path, ignore_errors=True)
174 shutil.rmtree(os_path, ignore_errors=True)
175
175
176 def delete_file(self, api_path):
176 def delete_file(self, api_path):
177 """Delete a file at the given path if it exists."""
177 """Delete a file at the given path if it exists."""
178 if self.isfile(api_path):
178 if self.isfile(api_path):
179 os.unlink(self.to_os_path(api_path))
179 os.unlink(self.to_os_path(api_path))
180
180
181 def isfile(self, api_path):
181 def isfile(self, api_path):
182 return os.path.isfile(self.to_os_path(api_path))
182 return os.path.isfile(self.to_os_path(api_path))
183
183
184 def isdir(self, api_path):
184 def isdir(self, api_path):
185 return os.path.isdir(self.to_os_path(api_path))
185 return os.path.isdir(self.to_os_path(api_path))
186
186
187 def setUp(self):
187 def setUp(self):
188
188
189 for d in (self.dirs + self.hidden_dirs):
189 for d in (self.dirs + self.hidden_dirs):
190 self.make_dir(d)
190 self.make_dir(d)
191
191
192 for d, name in self.dirs_nbs:
192 for d, name in self.dirs_nbs:
193 # create a notebook
193 # create a notebook
194 nb = new_notebook()
194 nb = new_notebook()
195 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
195 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
196
196
197 # create a text file
197 # create a text file
198 txt = self._txt_for_name(name)
198 txt = self._txt_for_name(name)
199 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
199 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
200
200
201 # create a binary file
201 # create a binary file
202 blob = self._blob_for_name(name)
202 blob = self._blob_for_name(name)
203 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
203 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
204
204
205 self.api = API(self.base_url())
205 self.api = API(self.base_url())
206
206
207 def tearDown(self):
207 def tearDown(self):
208 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
208 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
209 self.delete_dir(dname)
209 self.delete_dir(dname)
210 self.delete_file('inroot.ipynb')
210 self.delete_file('inroot.ipynb')
211
211
212 def test_list_notebooks(self):
212 def test_list_notebooks(self):
213 nbs = notebooks_only(self.api.list().json())
213 nbs = notebooks_only(self.api.list().json())
214 self.assertEqual(len(nbs), 1)
214 self.assertEqual(len(nbs), 1)
215 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
215 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
216
216
217 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
217 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
218 self.assertEqual(len(nbs), 1)
218 self.assertEqual(len(nbs), 1)
219 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
219 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
220
220
221 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
221 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
222 self.assertEqual(len(nbs), 1)
222 self.assertEqual(len(nbs), 1)
223 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
223 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
224 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
224 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
225
225
226 nbs = notebooks_only(self.api.list('/foo/bar/').json())
226 nbs = notebooks_only(self.api.list('/foo/bar/').json())
227 self.assertEqual(len(nbs), 1)
227 self.assertEqual(len(nbs), 1)
228 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
228 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
229 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
229 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
230
230
231 nbs = notebooks_only(self.api.list('foo').json())
231 nbs = notebooks_only(self.api.list('foo').json())
232 self.assertEqual(len(nbs), 4)
232 self.assertEqual(len(nbs), 4)
233 nbnames = { normalize('NFC', n['name']) for n in nbs }
233 nbnames = { normalize('NFC', n['name']) for n in nbs }
234 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
234 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
235 expected = { normalize('NFC', name) for name in expected }
235 expected = { normalize('NFC', name) for name in expected }
236 self.assertEqual(nbnames, expected)
236 self.assertEqual(nbnames, expected)
237
237
238 nbs = notebooks_only(self.api.list('ordering').json())
238 nbs = notebooks_only(self.api.list('ordering').json())
239 nbnames = [n['name'] for n in nbs]
239 nbnames = [n['name'] for n in nbs]
240 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
240 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
241 self.assertEqual(nbnames, expected)
241 self.assertEqual(nbnames, expected)
242
242
243 def test_list_dirs(self):
243 def test_list_dirs(self):
244 dirs = dirs_only(self.api.list().json())
244 dirs = dirs_only(self.api.list().json())
245 dir_names = {normalize('NFC', d['name']) for d in dirs}
245 dir_names = {normalize('NFC', d['name']) for d in dirs}
246 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
246 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
247
247
248 def test_get_dir_no_content(self):
248 def test_get_dir_no_content(self):
249 for d in self.dirs:
249 for d in self.dirs:
250 model = self.api.read(d, content=False).json()
250 model = self.api.read(d, content=False).json()
251 self.assertEqual(model['path'], d)
251 self.assertEqual(model['path'], d)
252 self.assertEqual(model['type'], 'directory')
252 self.assertEqual(model['type'], 'directory')
253 self.assertIn('content', model)
253 self.assertIn('content', model)
254 self.assertEqual(model['content'], None)
254 self.assertEqual(model['content'], None)
255
255
256 def test_list_nonexistant_dir(self):
256 def test_list_nonexistant_dir(self):
257 with assert_http_error(404):
257 with assert_http_error(404):
258 self.api.list('nonexistant')
258 self.api.list('nonexistant')
259
259
260 def test_get_nb_contents(self):
260 def test_get_nb_contents(self):
261 for d, name in self.dirs_nbs:
261 for d, name in self.dirs_nbs:
262 path = url_path_join(d, name + '.ipynb')
262 path = url_path_join(d, name + '.ipynb')
263 nb = self.api.read(path).json()
263 nb = self.api.read(path).json()
264 self.assertEqual(nb['name'], u'%s.ipynb' % name)
264 self.assertEqual(nb['name'], u'%s.ipynb' % name)
265 self.assertEqual(nb['path'], path)
265 self.assertEqual(nb['path'], path)
266 self.assertEqual(nb['type'], 'notebook')
266 self.assertEqual(nb['type'], 'notebook')
267 self.assertIn('content', nb)
267 self.assertIn('content', nb)
268 self.assertEqual(nb['format'], 'json')
268 self.assertEqual(nb['format'], 'json')
269 self.assertIn('metadata', nb['content'])
269 self.assertIn('metadata', nb['content'])
270 self.assertIsInstance(nb['content']['metadata'], dict)
270 self.assertIsInstance(nb['content']['metadata'], dict)
271
271
272 def test_get_nb_no_content(self):
272 def test_get_nb_no_content(self):
273 for d, name in self.dirs_nbs:
273 for d, name in self.dirs_nbs:
274 path = url_path_join(d, name + '.ipynb')
274 path = url_path_join(d, name + '.ipynb')
275 nb = self.api.read(path, content=False).json()
275 nb = self.api.read(path, content=False).json()
276 self.assertEqual(nb['name'], u'%s.ipynb' % name)
276 self.assertEqual(nb['name'], u'%s.ipynb' % name)
277 self.assertEqual(nb['path'], path)
277 self.assertEqual(nb['path'], path)
278 self.assertEqual(nb['type'], 'notebook')
278 self.assertEqual(nb['type'], 'notebook')
279 self.assertIn('content', nb)
279 self.assertIn('content', nb)
280 self.assertEqual(nb['content'], None)
280 self.assertEqual(nb['content'], None)
281
281
282 def test_get_contents_no_such_file(self):
282 def test_get_contents_no_such_file(self):
283 # Name that doesn't exist - should be a 404
283 # Name that doesn't exist - should be a 404
284 with assert_http_error(404):
284 with assert_http_error(404):
285 self.api.read('foo/q.ipynb')
285 self.api.read('foo/q.ipynb')
286
286
287 def test_get_text_file_contents(self):
287 def test_get_text_file_contents(self):
288 for d, name in self.dirs_nbs:
288 for d, name in self.dirs_nbs:
289 path = url_path_join(d, name + '.txt')
289 path = url_path_join(d, name + '.txt')
290 model = self.api.read(path).json()
290 model = self.api.read(path).json()
291 self.assertEqual(model['name'], u'%s.txt' % name)
291 self.assertEqual(model['name'], u'%s.txt' % name)
292 self.assertEqual(model['path'], path)
292 self.assertEqual(model['path'], path)
293 self.assertIn('content', model)
293 self.assertIn('content', model)
294 self.assertEqual(model['format'], 'text')
294 self.assertEqual(model['format'], 'text')
295 self.assertEqual(model['type'], 'file')
295 self.assertEqual(model['type'], 'file')
296 self.assertEqual(model['content'], self._txt_for_name(name))
296 self.assertEqual(model['content'], self._txt_for_name(name))
297
297
298 # Name that doesn't exist - should be a 404
298 # Name that doesn't exist - should be a 404
299 with assert_http_error(404):
299 with assert_http_error(404):
300 self.api.read('foo/q.txt')
300 self.api.read('foo/q.txt')
301
301
302 # Specifying format=text should fail on a non-UTF-8 file
302 # Specifying format=text should fail on a non-UTF-8 file
303 with assert_http_error(400):
303 with assert_http_error(400):
304 self.api.read('foo/bar/baz.blob', type='file', format='text')
304 self.api.read('foo/bar/baz.blob', type='file', format='text')
305
305
306 def test_get_binary_file_contents(self):
306 def test_get_binary_file_contents(self):
307 for d, name in self.dirs_nbs:
307 for d, name in self.dirs_nbs:
308 path = url_path_join(d, name + '.blob')
308 path = url_path_join(d, name + '.blob')
309 model = self.api.read(path).json()
309 model = self.api.read(path).json()
310 self.assertEqual(model['name'], u'%s.blob' % name)
310 self.assertEqual(model['name'], u'%s.blob' % name)
311 self.assertEqual(model['path'], path)
311 self.assertEqual(model['path'], path)
312 self.assertIn('content', model)
312 self.assertIn('content', model)
313 self.assertEqual(model['format'], 'base64')
313 self.assertEqual(model['format'], 'base64')
314 self.assertEqual(model['type'], 'file')
314 self.assertEqual(model['type'], 'file')
315 self.assertEqual(
315 self.assertEqual(
316 base64.decodestring(model['content'].encode('ascii')),
316 base64.decodestring(model['content'].encode('ascii')),
317 self._blob_for_name(name),
317 self._blob_for_name(name),
318 )
318 )
319
319
320 # Name that doesn't exist - should be a 404
320 # Name that doesn't exist - should be a 404
321 with assert_http_error(404):
321 with assert_http_error(404):
322 self.api.read('foo/q.txt')
322 self.api.read('foo/q.txt')
323
323
324 def test_get_bad_type(self):
324 def test_get_bad_type(self):
325 with assert_http_error(400):
325 with assert_http_error(400):
326 self.api.read(u'unicodΓ©', type='file') # this is a directory
326 self.api.read(u'unicodΓ©', type='file') # this is a directory
327
327
328 with assert_http_error(400):
328 with assert_http_error(400):
329 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
329 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
330
330
331 def _check_created(self, resp, path, type='notebook'):
331 def _check_created(self, resp, path, type='notebook'):
332 self.assertEqual(resp.status_code, 201)
332 self.assertEqual(resp.status_code, 201)
333 location_header = py3compat.str_to_unicode(resp.headers['Location'])
333 location_header = py3compat.str_to_unicode(resp.headers['Location'])
334 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
334 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
335 rjson = resp.json()
335 rjson = resp.json()
336 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
336 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
337 self.assertEqual(rjson['path'], path)
337 self.assertEqual(rjson['path'], path)
338 self.assertEqual(rjson['type'], type)
338 self.assertEqual(rjson['type'], type)
339 isright = self.isdir if type == 'directory' else self.isfile
339 isright = self.isdir if type == 'directory' else self.isfile
340 assert isright(path)
340 assert isright(path)
341
341
342 def test_create_untitled(self):
342 def test_create_untitled(self):
343 resp = self.api.create_untitled(path=u'Γ₯ b')
343 resp = self.api.create_untitled(path=u'Γ₯ b')
344 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
344 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
345
345
346 # Second time
346 # Second time
347 resp = self.api.create_untitled(path=u'Γ₯ b')
347 resp = self.api.create_untitled(path=u'Γ₯ b')
348 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
348 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
349
349
350 # And two directories down
350 # And two directories down
351 resp = self.api.create_untitled(path='foo/bar')
351 resp = self.api.create_untitled(path='foo/bar')
352 self._check_created(resp, 'foo/bar/Untitled.ipynb')
352 self._check_created(resp, 'foo/bar/Untitled.ipynb')
353
353
354 def test_create_untitled_txt(self):
354 def test_create_untitled_txt(self):
355 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
355 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
356 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
356 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
357
357
358 resp = self.api.read(path='foo/bar/untitled.txt')
358 resp = self.api.read(path='foo/bar/untitled.txt')
359 model = resp.json()
359 model = resp.json()
360 self.assertEqual(model['type'], 'file')
360 self.assertEqual(model['type'], 'file')
361 self.assertEqual(model['format'], 'text')
361 self.assertEqual(model['format'], 'text')
362 self.assertEqual(model['content'], '')
362 self.assertEqual(model['content'], '')
363
363
364 def test_upload(self):
364 def test_upload(self):
365 nb = new_notebook()
365 nb = new_notebook()
366 nbmodel = {'content': nb, 'type': 'notebook'}
366 nbmodel = {'content': nb, 'type': 'notebook'}
367 path = u'Γ₯ b/Upload tΓ©st.ipynb'
367 path = u'Γ₯ b/Upload tΓ©st.ipynb'
368 resp = self.api.upload(path, body=json.dumps(nbmodel))
368 resp = self.api.upload(path, body=json.dumps(nbmodel))
369 self._check_created(resp, path)
369 self._check_created(resp, path)
370
370
371 def test_mkdir_untitled(self):
371 def test_mkdir_untitled(self):
372 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
372 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
373 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
373 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
374
374
375 # Second time
375 # Second time
376 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
376 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
377 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
377 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
378
378
379 # And two directories down
379 # And two directories down
380 resp = self.api.mkdir_untitled(path='foo/bar')
380 resp = self.api.mkdir_untitled(path='foo/bar')
381 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
381 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
382
382
383 def test_mkdir(self):
383 def test_mkdir(self):
384 path = u'Γ₯ b/New βˆ‚ir'
384 path = u'Γ₯ b/New βˆ‚ir'
385 resp = self.api.mkdir(path)
385 resp = self.api.mkdir(path)
386 self._check_created(resp, path, type='directory')
386 self._check_created(resp, path, type='directory')
387
387
388 def test_mkdir_hidden_400(self):
388 def test_mkdir_hidden_400(self):
389 with assert_http_error(400):
389 with assert_http_error(400):
390 resp = self.api.mkdir(u'Γ₯ b/.hidden')
390 resp = self.api.mkdir(u'Γ₯ b/.hidden')
391
391
392 def test_upload_txt(self):
392 def test_upload_txt(self):
393 body = u'ΓΌnicode tΓ©xt'
393 body = u'ΓΌnicode tΓ©xt'
394 model = {
394 model = {
395 'content' : body,
395 'content' : body,
396 'format' : 'text',
396 'format' : 'text',
397 'type' : 'file',
397 'type' : 'file',
398 }
398 }
399 path = u'Γ₯ b/Upload tΓ©st.txt'
399 path = u'Γ₯ b/Upload tΓ©st.txt'
400 resp = self.api.upload(path, body=json.dumps(model))
400 resp = self.api.upload(path, body=json.dumps(model))
401
401
402 # check roundtrip
402 # check roundtrip
403 resp = self.api.read(path)
403 resp = self.api.read(path)
404 model = resp.json()
404 model = resp.json()
405 self.assertEqual(model['type'], 'file')
405 self.assertEqual(model['type'], 'file')
406 self.assertEqual(model['format'], 'text')
406 self.assertEqual(model['format'], 'text')
407 self.assertEqual(model['content'], body)
407 self.assertEqual(model['content'], body)
408
408
409 def test_upload_b64(self):
409 def test_upload_b64(self):
410 body = b'\xFFblob'
410 body = b'\xFFblob'
411 b64body = base64.encodestring(body).decode('ascii')
411 b64body = base64.encodestring(body).decode('ascii')
412 model = {
412 model = {
413 'content' : b64body,
413 'content' : b64body,
414 'format' : 'base64',
414 'format' : 'base64',
415 'type' : 'file',
415 'type' : 'file',
416 }
416 }
417 path = u'Γ₯ b/Upload tΓ©st.blob'
417 path = u'Γ₯ b/Upload tΓ©st.blob'
418 resp = self.api.upload(path, body=json.dumps(model))
418 resp = self.api.upload(path, body=json.dumps(model))
419
419
420 # check roundtrip
420 # check roundtrip
421 resp = self.api.read(path)
421 resp = self.api.read(path)
422 model = resp.json()
422 model = resp.json()
423 self.assertEqual(model['type'], 'file')
423 self.assertEqual(model['type'], 'file')
424 self.assertEqual(model['path'], path)
424 self.assertEqual(model['path'], path)
425 self.assertEqual(model['format'], 'base64')
425 self.assertEqual(model['format'], 'base64')
426 decoded = base64.decodestring(model['content'].encode('ascii'))
426 decoded = base64.decodestring(model['content'].encode('ascii'))
427 self.assertEqual(decoded, body)
427 self.assertEqual(decoded, body)
428
428
429 def test_upload_v2(self):
429 def test_upload_v2(self):
430 nb = v2.new_notebook()
430 nb = v2.new_notebook()
431 ws = v2.new_worksheet()
431 ws = v2.new_worksheet()
432 nb.worksheets.append(ws)
432 nb.worksheets.append(ws)
433 ws.cells.append(v2.new_code_cell(input='print("hi")'))
433 ws.cells.append(v2.new_code_cell(input='print("hi")'))
434 nbmodel = {'content': nb, 'type': 'notebook'}
434 nbmodel = {'content': nb, 'type': 'notebook'}
435 path = u'Γ₯ b/Upload tΓ©st.ipynb'
435 path = u'Γ₯ b/Upload tΓ©st.ipynb'
436 resp = self.api.upload(path, body=json.dumps(nbmodel))
436 resp = self.api.upload(path, body=json.dumps(nbmodel))
437 self._check_created(resp, path)
437 self._check_created(resp, path)
438 resp = self.api.read(path)
438 resp = self.api.read(path)
439 data = resp.json()
439 data = resp.json()
440 self.assertEqual(data['content']['nbformat'], 4)
440 self.assertEqual(data['content']['nbformat'], 4)
441
441
442 def test_copy(self):
442 def test_copy(self):
443 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
443 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
444 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
444 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
445
445
446 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
446 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
447 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
447 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
448
448
449 def test_copy_copy(self):
449 def test_copy_copy(self):
450 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
450 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
451 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
451 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
452
452
453 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
453 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
454 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
454 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
455
455
456 def test_copy_path(self):
456 def test_copy_path(self):
457 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
457 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
458 self._check_created(resp, u'Γ₯ b/a.ipynb')
458 self._check_created(resp, u'Γ₯ b/a.ipynb')
459
459
460 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
460 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
461 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
461 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
462
462
463 def test_copy_put_400(self):
463 def test_copy_put_400(self):
464 with assert_http_error(400):
464 with assert_http_error(400):
465 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
465 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
466
466
467 def test_copy_dir_400(self):
467 def test_copy_dir_400(self):
468 # can't copy directories
468 # can't copy directories
469 with assert_http_error(400):
469 with assert_http_error(400):
470 resp = self.api.copy(u'Γ₯ b', u'foo')
470 resp = self.api.copy(u'Γ₯ b', u'foo')
471
471
472 def test_delete(self):
472 def test_delete(self):
473 for d, name in self.dirs_nbs:
473 for d, name in self.dirs_nbs:
474 print('%r, %r' % (d, name))
474 print('%r, %r' % (d, name))
475 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
475 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
476 self.assertEqual(resp.status_code, 204)
476 self.assertEqual(resp.status_code, 204)
477
477
478 for d in self.dirs + ['/']:
478 for d in self.dirs + ['/']:
479 nbs = notebooks_only(self.api.list(d).json())
479 nbs = notebooks_only(self.api.list(d).json())
480 print('------')
480 print('------')
481 print(d)
481 print(d)
482 print(nbs)
482 print(nbs)
483 self.assertEqual(nbs, [])
483 self.assertEqual(nbs, [])
484
484
485 def test_delete_dirs(self):
485 def test_delete_dirs(self):
486 # depth-first delete everything, so we don't try to delete empty directories
486 # depth-first delete everything, so we don't try to delete empty directories
487 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
487 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
488 listing = self.api.list(name).json()['content']
488 listing = self.api.list(name).json()['content']
489 for model in listing:
489 for model in listing:
490 self.api.delete(model['path'])
490 self.api.delete(model['path'])
491 listing = self.api.list('/').json()['content']
491 listing = self.api.list('/').json()['content']
492 self.assertEqual(listing, [])
492 self.assertEqual(listing, [])
493
493
494 def test_delete_non_empty_dir(self):
494 def test_delete_non_empty_dir(self):
495 """delete non-empty dir raises 400"""
495 """delete non-empty dir raises 400"""
496 with assert_http_error(400):
496 with assert_http_error(400):
497 self.api.delete(u'Γ₯ b')
497 self.api.delete(u'Γ₯ b')
498
498
499 def test_rename(self):
499 def test_rename(self):
500 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
500 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
501 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
501 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
502 self.assertEqual(resp.json()['name'], 'z.ipynb')
502 self.assertEqual(resp.json()['name'], 'z.ipynb')
503 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
503 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
504 assert self.isfile('foo/z.ipynb')
504 assert self.isfile('foo/z.ipynb')
505
505
506 nbs = notebooks_only(self.api.list('foo').json())
506 nbs = notebooks_only(self.api.list('foo').json())
507 nbnames = set(n['name'] for n in nbs)
507 nbnames = set(n['name'] for n in nbs)
508 self.assertIn('z.ipynb', nbnames)
508 self.assertIn('z.ipynb', nbnames)
509 self.assertNotIn('a.ipynb', nbnames)
509 self.assertNotIn('a.ipynb', nbnames)
510
510
511 def test_checkpoints_follow_file(self):
512
513 # Read initial file state
514 orig = self.api.read('foo/a.ipynb')
515
516 # Create a checkpoint of initial state
517 r = self.api.new_checkpoint('foo/a.ipynb')
518 cp1 = r.json()
519
520 # Modify file and save
521 nbcontent = json.loads(orig.text)['content']
522 nb = from_dict(nbcontent)
523 hcell = new_markdown_cell('Created by test')
524 nb.cells.append(hcell)
525 nbmodel = {'content': nb, 'type': 'notebook'}
526 self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
527
528 # Rename the file.
529 self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
530
531 # Looking for checkpoints in the old location should yield no results.
532 self.assertEqual(self.api.get_checkpoints('foo/a.ipynb').json(), [])
533
534 # Looking for checkpoints in the new location should work.
535 cps = self.api.get_checkpoints('foo/z.ipynb').json()
536 self.assertEqual(cps, [cp1])
537
538 # Delete the file. The checkpoint should be deleted as well.
539 self.api.delete('foo/z.ipynb')
540 cps = self.api.get_checkpoints('foo/z.ipynb').json()
541 self.assertEqual(cps, [])
542
511 def test_rename_existing(self):
543 def test_rename_existing(self):
512 with assert_http_error(409):
544 with assert_http_error(409):
513 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
545 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
514
546
515 def test_save(self):
547 def test_save(self):
516 resp = self.api.read('foo/a.ipynb')
548 resp = self.api.read('foo/a.ipynb')
517 nbcontent = json.loads(resp.text)['content']
549 nbcontent = json.loads(resp.text)['content']
518 nb = from_dict(nbcontent)
550 nb = from_dict(nbcontent)
519 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
551 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
520
552
521 nbmodel= {'content': nb, 'type': 'notebook'}
553 nbmodel = {'content': nb, 'type': 'notebook'}
522 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
554 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
523
555
524 nbcontent = self.api.read('foo/a.ipynb').json()['content']
556 nbcontent = self.api.read('foo/a.ipynb').json()['content']
525 newnb = from_dict(nbcontent)
557 newnb = from_dict(nbcontent)
526 self.assertEqual(newnb.cells[0].source,
558 self.assertEqual(newnb.cells[0].source,
527 u'Created by test Β³')
559 u'Created by test Β³')
528
560
529 def test_checkpoints(self):
561 def test_checkpoints(self):
530 resp = self.api.read('foo/a.ipynb')
562 resp = self.api.read('foo/a.ipynb')
531 r = self.api.new_checkpoint('foo/a.ipynb')
563 r = self.api.new_checkpoint('foo/a.ipynb')
532 self.assertEqual(r.status_code, 201)
564 self.assertEqual(r.status_code, 201)
533 cp1 = r.json()
565 cp1 = r.json()
534 self.assertEqual(set(cp1), {'id', 'last_modified'})
566 self.assertEqual(set(cp1), {'id', 'last_modified'})
535 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
567 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
536
568
537 # Modify it
569 # Modify it
538 nbcontent = json.loads(resp.text)['content']
570 nbcontent = json.loads(resp.text)['content']
539 nb = from_dict(nbcontent)
571 nb = from_dict(nbcontent)
540 hcell = new_markdown_cell('Created by test')
572 hcell = new_markdown_cell('Created by test')
541 nb.cells.append(hcell)
573 nb.cells.append(hcell)
542 # Save
574 # Save
543 nbmodel= {'content': nb, 'type': 'notebook'}
575 nbmodel= {'content': nb, 'type': 'notebook'}
544 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
576 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
545
577
546 # List checkpoints
578 # List checkpoints
547 cps = self.api.get_checkpoints('foo/a.ipynb').json()
579 cps = self.api.get_checkpoints('foo/a.ipynb').json()
548 self.assertEqual(cps, [cp1])
580 self.assertEqual(cps, [cp1])
549
581
550 nbcontent = self.api.read('foo/a.ipynb').json()['content']
582 nbcontent = self.api.read('foo/a.ipynb').json()['content']
551 nb = from_dict(nbcontent)
583 nb = from_dict(nbcontent)
552 self.assertEqual(nb.cells[0].source, 'Created by test')
584 self.assertEqual(nb.cells[0].source, 'Created by test')
553
585
554 # Restore cp1
586 # Restore cp1
555 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
587 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
556 self.assertEqual(r.status_code, 204)
588 self.assertEqual(r.status_code, 204)
557 nbcontent = self.api.read('foo/a.ipynb').json()['content']
589 nbcontent = self.api.read('foo/a.ipynb').json()['content']
558 nb = from_dict(nbcontent)
590 nb = from_dict(nbcontent)
559 self.assertEqual(nb.cells, [])
591 self.assertEqual(nb.cells, [])
560
592
561 # Delete cp1
593 # Delete cp1
562 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
594 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
563 self.assertEqual(r.status_code, 204)
595 self.assertEqual(r.status_code, 204)
564 cps = self.api.get_checkpoints('foo/a.ipynb').json()
596 cps = self.api.get_checkpoints('foo/a.ipynb').json()
565 self.assertEqual(cps, [])
597 self.assertEqual(cps, [])
566
598
567 def test_file_checkpoints(self):
599 def test_file_checkpoints(self):
568 """
600 """
569 Test checkpointing of non-notebook files.
601 Test checkpointing of non-notebook files.
570 """
602 """
571 filename = 'foo/a.txt'
603 filename = 'foo/a.txt'
572 resp = self.api.read(filename)
604 resp = self.api.read(filename)
573 orig_content = json.loads(resp.text)['content']
605 orig_content = json.loads(resp.text)['content']
574
606
575 # Create a checkpoint.
607 # Create a checkpoint.
576 r = self.api.new_checkpoint(filename)
608 r = self.api.new_checkpoint(filename)
577 self.assertEqual(r.status_code, 201)
609 self.assertEqual(r.status_code, 201)
578 cp1 = r.json()
610 cp1 = r.json()
579 self.assertEqual(set(cp1), {'id', 'last_modified'})
611 self.assertEqual(set(cp1), {'id', 'last_modified'})
580 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
612 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
581
613
582 # Modify the file and save.
614 # Modify the file and save.
583 new_content = orig_content + '\nsecond line'
615 new_content = orig_content + '\nsecond line'
584 model = {
616 model = {
585 'content': new_content,
617 'content': new_content,
586 'type': 'file',
618 'type': 'file',
587 'format': 'text',
619 'format': 'text',
588 }
620 }
589 resp = self.api.save(filename, body=json.dumps(model))
621 resp = self.api.save(filename, body=json.dumps(model))
590
622
591 # List checkpoints
623 # List checkpoints
592 cps = self.api.get_checkpoints(filename).json()
624 cps = self.api.get_checkpoints(filename).json()
593 self.assertEqual(cps, [cp1])
625 self.assertEqual(cps, [cp1])
594
626
595 content = self.api.read(filename).json()['content']
627 content = self.api.read(filename).json()['content']
596 self.assertEqual(content, new_content)
628 self.assertEqual(content, new_content)
597
629
598 # Restore cp1
630 # Restore cp1
599 r = self.api.restore_checkpoint(filename, cp1['id'])
631 r = self.api.restore_checkpoint(filename, cp1['id'])
600 self.assertEqual(r.status_code, 204)
632 self.assertEqual(r.status_code, 204)
601 restored_content = self.api.read(filename).json()['content']
633 restored_content = self.api.read(filename).json()['content']
602 self.assertEqual(restored_content, orig_content)
634 self.assertEqual(restored_content, orig_content)
603
635
604 # Delete cp1
636 # Delete cp1
605 r = self.api.delete_checkpoint(filename, cp1['id'])
637 r = self.api.delete_checkpoint(filename, cp1['id'])
606 self.assertEqual(r.status_code, 204)
638 self.assertEqual(r.status_code, 204)
607 cps = self.api.get_checkpoints(filename).json()
639 cps = self.api.get_checkpoints(filename).json()
608 self.assertEqual(cps, [])
640 self.assertEqual(cps, [])
609
641
610 @contextmanager
642 @contextmanager
611 def patch_cp_root(self, dirname):
643 def patch_cp_root(self, dirname):
612 """
644 """
613 Temporarily patch the root dir of our checkpoint manager.
645 Temporarily patch the root dir of our checkpoint manager.
614 """
646 """
615 cpm = self.notebook.contents_manager.checkpoints
647 cpm = self.notebook.contents_manager.checkpoints
616 old_dirname = cpm.root_dir
648 old_dirname = cpm.root_dir
617 cpm.root_dir = dirname
649 cpm.root_dir = dirname
618 try:
650 try:
619 yield
651 yield
620 finally:
652 finally:
621 cpm.root_dir = old_dirname
653 cpm.root_dir = old_dirname
622
654
623 def test_checkpoints_separate_root(self):
655 def test_checkpoints_separate_root(self):
624 """
656 """
625 Test that FileCheckpoints functions correctly even when it's
657 Test that FileCheckpoints functions correctly even when it's
626 using a different root dir from FileContentsManager. This also keeps
658 using a different root dir from FileContentsManager. This also keeps
627 the implementation honest for use with ContentsManagers that don't map
659 the implementation honest for use with ContentsManagers that don't map
628 models to the filesystem
660 models to the filesystem
629
661
630 Override this method to a no-op when testing other managers.
662 Override this method to a no-op when testing other managers.
631 """
663 """
632 with TemporaryDirectory() as td:
664 with TemporaryDirectory() as td:
633 with self.patch_cp_root(td):
665 with self.patch_cp_root(td):
634 self.test_checkpoints()
666 self.test_checkpoints()
635
667
636 with TemporaryDirectory() as td:
668 with TemporaryDirectory() as td:
637 with self.patch_cp_root(td):
669 with self.patch_cp_root(td):
638 self.test_file_checkpoints()
670 self.test_file_checkpoints()
639
671
640
672
641 class GenericFileCheckpointsAPITest(APITest):
673 class GenericFileCheckpointsAPITest(APITest):
642 """
674 """
643 Run the tests from APITest with GenericFileCheckpoints.
675 Run the tests from APITest with GenericFileCheckpoints.
644 """
676 """
645 config = Config()
677 config = Config()
646 config.FileContentsManager.checkpoints_class = GenericFileCheckpoints
678 config.FileContentsManager.checkpoints_class = GenericFileCheckpoints
647
679
648 def test_config_did_something(self):
680 def test_config_did_something(self):
649
681
650 self.assertIsInstance(
682 self.assertIsInstance(
651 self.notebook.contents_manager.checkpoints,
683 self.notebook.contents_manager.checkpoints,
652 GenericFileCheckpoints,
684 GenericFileCheckpoints,
653 )
685 )
654
686
655
687
General Comments 0
You need to be logged in to leave comments. Login now