##// END OF EJS Templates
TEST: Also test deletion.
Scott Sanderson -
Show More
@@ -1,683 +1,687 b''
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 from contextlib import contextmanager
6 6 import io
7 7 import json
8 8 import os
9 9 import shutil
10 10 from unicodedata import normalize
11 11
12 12 pjoin = os.path.join
13 13
14 14 import requests
15 15
16 16 from ..filecheckpoints import GenericFileCheckpoints
17 17
18 18 from IPython.config import Config
19 19 from IPython.html.utils import url_path_join, url_escape, to_os_path
20 20 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
21 21 from IPython.nbformat import read, write, from_dict
22 22 from IPython.nbformat.v4 import (
23 23 new_notebook, new_markdown_cell,
24 24 )
25 25 from IPython.nbformat import v2
26 26 from IPython.utils import py3compat
27 27 from IPython.utils.data import uniq_stable
28 28 from IPython.utils.tempdir import TemporaryDirectory
29 29
30 30
31 31 def notebooks_only(dir_model):
32 32 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
33 33
34 34 def dirs_only(dir_model):
35 35 return [x for x in dir_model['content'] if x['type']=='directory']
36 36
37 37
38 38 class API(object):
39 39 """Wrapper for contents API calls."""
40 40 def __init__(self, base_url):
41 41 self.base_url = base_url
42 42
43 43 def _req(self, verb, path, body=None, params=None):
44 44 response = requests.request(verb,
45 45 url_path_join(self.base_url, 'api/contents', path),
46 46 data=body, params=params,
47 47 )
48 48 response.raise_for_status()
49 49 return response
50 50
51 51 def list(self, path='/'):
52 52 return self._req('GET', path)
53 53
54 54 def read(self, path, type=None, format=None, content=None):
55 55 params = {}
56 56 if type is not None:
57 57 params['type'] = type
58 58 if format is not None:
59 59 params['format'] = format
60 60 if content == False:
61 61 params['content'] = '0'
62 62 return self._req('GET', path, params=params)
63 63
64 64 def create_untitled(self, path='/', ext='.ipynb'):
65 65 body = None
66 66 if ext:
67 67 body = json.dumps({'ext': ext})
68 68 return self._req('POST', path, body)
69 69
70 70 def mkdir_untitled(self, path='/'):
71 71 return self._req('POST', path, json.dumps({'type': 'directory'}))
72 72
73 73 def copy(self, copy_from, path='/'):
74 74 body = json.dumps({'copy_from':copy_from})
75 75 return self._req('POST', path, body)
76 76
77 77 def create(self, path='/'):
78 78 return self._req('PUT', path)
79 79
80 80 def upload(self, path, body):
81 81 return self._req('PUT', path, body)
82 82
83 83 def mkdir(self, path='/'):
84 84 return self._req('PUT', path, json.dumps({'type': 'directory'}))
85 85
86 86 def copy_put(self, copy_from, path='/'):
87 87 body = json.dumps({'copy_from':copy_from})
88 88 return self._req('PUT', path, body)
89 89
90 90 def save(self, path, body):
91 91 return self._req('PUT', path, body)
92 92
93 93 def delete(self, path='/'):
94 94 return self._req('DELETE', path)
95 95
96 96 def rename(self, path, new_path):
97 97 body = json.dumps({'path': new_path})
98 98 return self._req('PATCH', path, body)
99 99
100 100 def get_checkpoints(self, path):
101 101 return self._req('GET', url_path_join(path, 'checkpoints'))
102 102
103 103 def new_checkpoint(self, path):
104 104 return self._req('POST', url_path_join(path, 'checkpoints'))
105 105
106 106 def restore_checkpoint(self, path, checkpoint_id):
107 107 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
108 108
109 109 def delete_checkpoint(self, path, checkpoint_id):
110 110 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
111 111
112 112 class APITest(NotebookTestBase):
113 113 """Test the kernels web service API"""
114 114 dirs_nbs = [('', 'inroot'),
115 115 ('Directory with spaces in', 'inspace'),
116 116 (u'unicodΓ©', 'innonascii'),
117 117 ('foo', 'a'),
118 118 ('foo', 'b'),
119 119 ('foo', 'name with spaces'),
120 120 ('foo', u'unicodΓ©'),
121 121 ('foo/bar', 'baz'),
122 122 ('ordering', 'A'),
123 123 ('ordering', 'b'),
124 124 ('ordering', 'C'),
125 125 (u'Γ₯ b', u'Γ§ d'),
126 126 ]
127 127 hidden_dirs = ['.hidden', '__pycache__']
128 128
129 129 # Don't include root dir.
130 130 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs[1:]])
131 131 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
132 132
133 133 @staticmethod
134 134 def _blob_for_name(name):
135 135 return name.encode('utf-8') + b'\xFF'
136 136
137 137 @staticmethod
138 138 def _txt_for_name(name):
139 139 return u'%s text file' % name
140 140
141 141 def to_os_path(self, api_path):
142 142 return to_os_path(api_path, root=self.notebook_dir.name)
143 143
144 144 def make_dir(self, api_path):
145 145 """Create a directory at api_path"""
146 146 os_path = self.to_os_path(api_path)
147 147 try:
148 148 os.makedirs(os_path)
149 149 except OSError:
150 150 print("Directory already exists: %r" % os_path)
151 151
152 152 def make_txt(self, api_path, txt):
153 153 """Make a text file at a given api_path"""
154 154 os_path = self.to_os_path(api_path)
155 155 with io.open(os_path, 'w', encoding='utf-8') as f:
156 156 f.write(txt)
157 157
158 158 def make_blob(self, api_path, blob):
159 159 """Make a binary file at a given api_path"""
160 160 os_path = self.to_os_path(api_path)
161 161 with io.open(os_path, 'wb') as f:
162 162 f.write(blob)
163 163
164 164 def make_nb(self, api_path, nb):
165 165 """Make a notebook file at a given api_path"""
166 166 os_path = self.to_os_path(api_path)
167 167
168 168 with io.open(os_path, 'w', encoding='utf-8') as f:
169 169 write(nb, f, version=4)
170 170
171 171 def delete_dir(self, api_path):
172 172 """Delete a directory at api_path, removing any contents."""
173 173 os_path = self.to_os_path(api_path)
174 174 shutil.rmtree(os_path, ignore_errors=True)
175 175
176 176 def delete_file(self, api_path):
177 177 """Delete a file at the given path if it exists."""
178 178 if self.isfile(api_path):
179 179 os.unlink(self.to_os_path(api_path))
180 180
181 181 def isfile(self, api_path):
182 182 return os.path.isfile(self.to_os_path(api_path))
183 183
184 184 def isdir(self, api_path):
185 185 return os.path.isdir(self.to_os_path(api_path))
186 186
187 187 def setUp(self):
188 188
189 189 for d in (self.dirs + self.hidden_dirs):
190 190 self.make_dir(d)
191 191
192 192 for d, name in self.dirs_nbs:
193 193 # create a notebook
194 194 nb = new_notebook()
195 195 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
196 196
197 197 # create a text file
198 198 txt = self._txt_for_name(name)
199 199 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
200 200
201 201 # create a binary file
202 202 blob = self._blob_for_name(name)
203 203 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
204 204
205 205 self.api = API(self.base_url())
206 206
207 207 def tearDown(self):
208 208 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
209 209 self.delete_dir(dname)
210 210 self.delete_file('inroot.ipynb')
211 211
212 212 def test_list_notebooks(self):
213 213 nbs = notebooks_only(self.api.list().json())
214 214 self.assertEqual(len(nbs), 1)
215 215 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
216 216
217 217 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
218 218 self.assertEqual(len(nbs), 1)
219 219 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
220 220
221 221 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
222 222 self.assertEqual(len(nbs), 1)
223 223 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
224 224 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
225 225
226 226 nbs = notebooks_only(self.api.list('/foo/bar/').json())
227 227 self.assertEqual(len(nbs), 1)
228 228 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
229 229 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
230 230
231 231 nbs = notebooks_only(self.api.list('foo').json())
232 232 self.assertEqual(len(nbs), 4)
233 233 nbnames = { normalize('NFC', n['name']) for n in nbs }
234 234 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
235 235 expected = { normalize('NFC', name) for name in expected }
236 236 self.assertEqual(nbnames, expected)
237 237
238 238 nbs = notebooks_only(self.api.list('ordering').json())
239 239 nbnames = [n['name'] for n in nbs]
240 240 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
241 241 self.assertEqual(nbnames, expected)
242 242
243 243 def test_list_dirs(self):
244 244 dirs = dirs_only(self.api.list().json())
245 245 dir_names = {normalize('NFC', d['name']) for d in dirs}
246 246 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
247 247
248 248 def test_get_dir_no_content(self):
249 249 for d in self.dirs:
250 250 model = self.api.read(d, content=False).json()
251 251 self.assertEqual(model['path'], d)
252 252 self.assertEqual(model['type'], 'directory')
253 253 self.assertIn('content', model)
254 254 self.assertEqual(model['content'], None)
255 255
256 256 def test_list_nonexistant_dir(self):
257 257 with assert_http_error(404):
258 258 self.api.list('nonexistant')
259 259
260 260 def test_get_nb_contents(self):
261 261 for d, name in self.dirs_nbs:
262 262 path = url_path_join(d, name + '.ipynb')
263 263 nb = self.api.read(path).json()
264 264 self.assertEqual(nb['name'], u'%s.ipynb' % name)
265 265 self.assertEqual(nb['path'], path)
266 266 self.assertEqual(nb['type'], 'notebook')
267 267 self.assertIn('content', nb)
268 268 self.assertEqual(nb['format'], 'json')
269 269 self.assertIn('metadata', nb['content'])
270 270 self.assertIsInstance(nb['content']['metadata'], dict)
271 271
272 272 def test_get_nb_no_content(self):
273 273 for d, name in self.dirs_nbs:
274 274 path = url_path_join(d, name + '.ipynb')
275 275 nb = self.api.read(path, content=False).json()
276 276 self.assertEqual(nb['name'], u'%s.ipynb' % name)
277 277 self.assertEqual(nb['path'], path)
278 278 self.assertEqual(nb['type'], 'notebook')
279 279 self.assertIn('content', nb)
280 280 self.assertEqual(nb['content'], None)
281 281
282 282 def test_get_contents_no_such_file(self):
283 283 # Name that doesn't exist - should be a 404
284 284 with assert_http_error(404):
285 285 self.api.read('foo/q.ipynb')
286 286
287 287 def test_get_text_file_contents(self):
288 288 for d, name in self.dirs_nbs:
289 289 path = url_path_join(d, name + '.txt')
290 290 model = self.api.read(path).json()
291 291 self.assertEqual(model['name'], u'%s.txt' % name)
292 292 self.assertEqual(model['path'], path)
293 293 self.assertIn('content', model)
294 294 self.assertEqual(model['format'], 'text')
295 295 self.assertEqual(model['type'], 'file')
296 296 self.assertEqual(model['content'], self._txt_for_name(name))
297 297
298 298 # Name that doesn't exist - should be a 404
299 299 with assert_http_error(404):
300 300 self.api.read('foo/q.txt')
301 301
302 302 # Specifying format=text should fail on a non-UTF-8 file
303 303 with assert_http_error(400):
304 304 self.api.read('foo/bar/baz.blob', type='file', format='text')
305 305
306 306 def test_get_binary_file_contents(self):
307 307 for d, name in self.dirs_nbs:
308 308 path = url_path_join(d, name + '.blob')
309 309 model = self.api.read(path).json()
310 310 self.assertEqual(model['name'], u'%s.blob' % name)
311 311 self.assertEqual(model['path'], path)
312 312 self.assertIn('content', model)
313 313 self.assertEqual(model['format'], 'base64')
314 314 self.assertEqual(model['type'], 'file')
315 315 self.assertEqual(
316 316 base64.decodestring(model['content'].encode('ascii')),
317 317 self._blob_for_name(name),
318 318 )
319 319
320 320 # Name that doesn't exist - should be a 404
321 321 with assert_http_error(404):
322 322 self.api.read('foo/q.txt')
323 323
324 324 def test_get_bad_type(self):
325 325 with assert_http_error(400):
326 326 self.api.read(u'unicodΓ©', type='file') # this is a directory
327 327
328 328 with assert_http_error(400):
329 329 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
330 330
331 331 def _check_created(self, resp, path, type='notebook'):
332 332 self.assertEqual(resp.status_code, 201)
333 333 location_header = py3compat.str_to_unicode(resp.headers['Location'])
334 334 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
335 335 rjson = resp.json()
336 336 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
337 337 self.assertEqual(rjson['path'], path)
338 338 self.assertEqual(rjson['type'], type)
339 339 isright = self.isdir if type == 'directory' else self.isfile
340 340 assert isright(path)
341 341
342 342 def test_create_untitled(self):
343 343 resp = self.api.create_untitled(path=u'Γ₯ b')
344 344 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
345 345
346 346 # Second time
347 347 resp = self.api.create_untitled(path=u'Γ₯ b')
348 348 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
349 349
350 350 # And two directories down
351 351 resp = self.api.create_untitled(path='foo/bar')
352 352 self._check_created(resp, 'foo/bar/Untitled.ipynb')
353 353
354 354 def test_create_untitled_txt(self):
355 355 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
356 356 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
357 357
358 358 resp = self.api.read(path='foo/bar/untitled.txt')
359 359 model = resp.json()
360 360 self.assertEqual(model['type'], 'file')
361 361 self.assertEqual(model['format'], 'text')
362 362 self.assertEqual(model['content'], '')
363 363
364 364 def test_upload(self):
365 365 nb = new_notebook()
366 366 nbmodel = {'content': nb, 'type': 'notebook'}
367 367 path = u'Γ₯ b/Upload tΓ©st.ipynb'
368 368 resp = self.api.upload(path, body=json.dumps(nbmodel))
369 369 self._check_created(resp, path)
370 370
371 371 def test_mkdir_untitled(self):
372 372 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
373 373 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
374 374
375 375 # Second time
376 376 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
377 377 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
378 378
379 379 # And two directories down
380 380 resp = self.api.mkdir_untitled(path='foo/bar')
381 381 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
382 382
383 383 def test_mkdir(self):
384 384 path = u'Γ₯ b/New βˆ‚ir'
385 385 resp = self.api.mkdir(path)
386 386 self._check_created(resp, path, type='directory')
387 387
388 388 def test_mkdir_hidden_400(self):
389 389 with assert_http_error(400):
390 390 resp = self.api.mkdir(u'Γ₯ b/.hidden')
391 391
392 392 def test_upload_txt(self):
393 393 body = u'ΓΌnicode tΓ©xt'
394 394 model = {
395 395 'content' : body,
396 396 'format' : 'text',
397 397 'type' : 'file',
398 398 }
399 399 path = u'Γ₯ b/Upload tΓ©st.txt'
400 400 resp = self.api.upload(path, body=json.dumps(model))
401 401
402 402 # check roundtrip
403 403 resp = self.api.read(path)
404 404 model = resp.json()
405 405 self.assertEqual(model['type'], 'file')
406 406 self.assertEqual(model['format'], 'text')
407 407 self.assertEqual(model['content'], body)
408 408
409 409 def test_upload_b64(self):
410 410 body = b'\xFFblob'
411 411 b64body = base64.encodestring(body).decode('ascii')
412 412 model = {
413 413 'content' : b64body,
414 414 'format' : 'base64',
415 415 'type' : 'file',
416 416 }
417 417 path = u'Γ₯ b/Upload tΓ©st.blob'
418 418 resp = self.api.upload(path, body=json.dumps(model))
419 419
420 420 # check roundtrip
421 421 resp = self.api.read(path)
422 422 model = resp.json()
423 423 self.assertEqual(model['type'], 'file')
424 424 self.assertEqual(model['path'], path)
425 425 self.assertEqual(model['format'], 'base64')
426 426 decoded = base64.decodestring(model['content'].encode('ascii'))
427 427 self.assertEqual(decoded, body)
428 428
429 429 def test_upload_v2(self):
430 430 nb = v2.new_notebook()
431 431 ws = v2.new_worksheet()
432 432 nb.worksheets.append(ws)
433 433 ws.cells.append(v2.new_code_cell(input='print("hi")'))
434 434 nbmodel = {'content': nb, 'type': 'notebook'}
435 435 path = u'Γ₯ b/Upload tΓ©st.ipynb'
436 436 resp = self.api.upload(path, body=json.dumps(nbmodel))
437 437 self._check_created(resp, path)
438 438 resp = self.api.read(path)
439 439 data = resp.json()
440 440 self.assertEqual(data['content']['nbformat'], 4)
441 441
442 442 def test_copy(self):
443 443 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
444 444 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
445 445
446 446 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
447 447 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
448 448
449 449 def test_copy_copy(self):
450 450 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
451 451 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
452 452
453 453 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
454 454 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
455 455
456 456 def test_copy_path(self):
457 457 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
458 458 self._check_created(resp, u'Γ₯ b/a.ipynb')
459 459
460 460 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
461 461 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
462 462
463 463 def test_copy_put_400(self):
464 464 with assert_http_error(400):
465 465 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
466 466
467 467 def test_copy_dir_400(self):
468 468 # can't copy directories
469 469 with assert_http_error(400):
470 470 resp = self.api.copy(u'Γ₯ b', u'foo')
471 471
472 472 def test_delete(self):
473 473 for d, name in self.dirs_nbs:
474 474 print('%r, %r' % (d, name))
475 475 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
476 476 self.assertEqual(resp.status_code, 204)
477 477
478 478 for d in self.dirs + ['/']:
479 479 nbs = notebooks_only(self.api.list(d).json())
480 480 print('------')
481 481 print(d)
482 482 print(nbs)
483 483 self.assertEqual(nbs, [])
484 484
485 485 def test_delete_dirs(self):
486 486 # depth-first delete everything, so we don't try to delete empty directories
487 487 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
488 488 listing = self.api.list(name).json()['content']
489 489 for model in listing:
490 490 self.api.delete(model['path'])
491 491 listing = self.api.list('/').json()['content']
492 492 self.assertEqual(listing, [])
493 493
494 494 def test_delete_non_empty_dir(self):
495 495 """delete non-empty dir raises 400"""
496 496 with assert_http_error(400):
497 497 self.api.delete(u'Γ₯ b')
498 498
499 499 def test_rename(self):
500 500 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
501 501 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
502 502 self.assertEqual(resp.json()['name'], 'z.ipynb')
503 503 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
504 504 assert self.isfile('foo/z.ipynb')
505 505
506 506 nbs = notebooks_only(self.api.list('foo').json())
507 507 nbnames = set(n['name'] for n in nbs)
508 508 self.assertIn('z.ipynb', nbnames)
509 509 self.assertNotIn('a.ipynb', nbnames)
510 510
511 def test_rename_preserves_checkpoints(self):
511 def test_checkpoints_follow_file(self):
512 512
513 513 # Read initial file state
514 514 orig = self.api.read('foo/a.ipynb')
515 515
516 516 # Create a checkpoint of initial state
517 517 r = self.api.new_checkpoint('foo/a.ipynb')
518 518 cp1 = r.json()
519 519
520 520 # Modify file and save
521 521 nbcontent = json.loads(orig.text)['content']
522 522 nb = from_dict(nbcontent)
523 523 hcell = new_markdown_cell('Created by test')
524 524 nb.cells.append(hcell)
525 525 nbmodel = {'content': nb, 'type': 'notebook'}
526 526 self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
527 527
528 528 # Rename the file.
529 529 self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
530 530
531 531 # Looking for checkpoints in the old location should yield no results.
532 532 self.assertEqual(self.api.get_checkpoints('foo/a.ipynb').json(), [])
533 533
534 534 # Looking for checkpoints in the new location should work.
535 535 cps = self.api.get_checkpoints('foo/z.ipynb').json()
536 self.assertEqual(len(cps), 1)
537 self.assertEqual(cps[0], cp1)
536 self.assertEqual(cps, [cp1])
537
538 # Delete the file. The checkpoint should be deleted as well.
539 self.api.delete('foo/z.ipynb')
540 cps = self.api.get_checkpoints('foo/z.ipynb').json()
541 self.assertEqual(cps, [])
538 542
539 543 def test_rename_existing(self):
540 544 with assert_http_error(409):
541 545 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
542 546
543 547 def test_save(self):
544 548 resp = self.api.read('foo/a.ipynb')
545 549 nbcontent = json.loads(resp.text)['content']
546 550 nb = from_dict(nbcontent)
547 551 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
548 552
549 553 nbmodel = {'content': nb, 'type': 'notebook'}
550 554 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
551 555
552 556 nbcontent = self.api.read('foo/a.ipynb').json()['content']
553 557 newnb = from_dict(nbcontent)
554 558 self.assertEqual(newnb.cells[0].source,
555 559 u'Created by test Β³')
556 560
557 561 def test_checkpoints(self):
558 562 resp = self.api.read('foo/a.ipynb')
559 563 r = self.api.new_checkpoint('foo/a.ipynb')
560 564 self.assertEqual(r.status_code, 201)
561 565 cp1 = r.json()
562 566 self.assertEqual(set(cp1), {'id', 'last_modified'})
563 567 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
564 568
565 569 # Modify it
566 570 nbcontent = json.loads(resp.text)['content']
567 571 nb = from_dict(nbcontent)
568 572 hcell = new_markdown_cell('Created by test')
569 573 nb.cells.append(hcell)
570 574 # Save
571 575 nbmodel= {'content': nb, 'type': 'notebook'}
572 576 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
573 577
574 578 # List checkpoints
575 579 cps = self.api.get_checkpoints('foo/a.ipynb').json()
576 580 self.assertEqual(cps, [cp1])
577 581
578 582 nbcontent = self.api.read('foo/a.ipynb').json()['content']
579 583 nb = from_dict(nbcontent)
580 584 self.assertEqual(nb.cells[0].source, 'Created by test')
581 585
582 586 # Restore cp1
583 587 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
584 588 self.assertEqual(r.status_code, 204)
585 589 nbcontent = self.api.read('foo/a.ipynb').json()['content']
586 590 nb = from_dict(nbcontent)
587 591 self.assertEqual(nb.cells, [])
588 592
589 593 # Delete cp1
590 594 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
591 595 self.assertEqual(r.status_code, 204)
592 596 cps = self.api.get_checkpoints('foo/a.ipynb').json()
593 597 self.assertEqual(cps, [])
594 598
595 599 def test_file_checkpoints(self):
596 600 """
597 601 Test checkpointing of non-notebook files.
598 602 """
599 603 filename = 'foo/a.txt'
600 604 resp = self.api.read(filename)
601 605 orig_content = json.loads(resp.text)['content']
602 606
603 607 # Create a checkpoint.
604 608 r = self.api.new_checkpoint(filename)
605 609 self.assertEqual(r.status_code, 201)
606 610 cp1 = r.json()
607 611 self.assertEqual(set(cp1), {'id', 'last_modified'})
608 612 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
609 613
610 614 # Modify the file and save.
611 615 new_content = orig_content + '\nsecond line'
612 616 model = {
613 617 'content': new_content,
614 618 'type': 'file',
615 619 'format': 'text',
616 620 }
617 621 resp = self.api.save(filename, body=json.dumps(model))
618 622
619 623 # List checkpoints
620 624 cps = self.api.get_checkpoints(filename).json()
621 625 self.assertEqual(cps, [cp1])
622 626
623 627 content = self.api.read(filename).json()['content']
624 628 self.assertEqual(content, new_content)
625 629
626 630 # Restore cp1
627 631 r = self.api.restore_checkpoint(filename, cp1['id'])
628 632 self.assertEqual(r.status_code, 204)
629 633 restored_content = self.api.read(filename).json()['content']
630 634 self.assertEqual(restored_content, orig_content)
631 635
632 636 # Delete cp1
633 637 r = self.api.delete_checkpoint(filename, cp1['id'])
634 638 self.assertEqual(r.status_code, 204)
635 639 cps = self.api.get_checkpoints(filename).json()
636 640 self.assertEqual(cps, [])
637 641
638 642 @contextmanager
639 643 def patch_cp_root(self, dirname):
640 644 """
641 645 Temporarily patch the root dir of our checkpoint manager.
642 646 """
643 647 cpm = self.notebook.contents_manager.checkpoints
644 648 old_dirname = cpm.root_dir
645 649 cpm.root_dir = dirname
646 650 try:
647 651 yield
648 652 finally:
649 653 cpm.root_dir = old_dirname
650 654
651 655 def test_checkpoints_separate_root(self):
652 656 """
653 657 Test that FileCheckpoints functions correctly even when it's
654 658 using a different root dir from FileContentsManager. This also keeps
655 659 the implementation honest for use with ContentsManagers that don't map
656 660 models to the filesystem
657 661
658 662 Override this method to a no-op when testing other managers.
659 663 """
660 664 with TemporaryDirectory() as td:
661 665 with self.patch_cp_root(td):
662 666 self.test_checkpoints()
663 667
664 668 with TemporaryDirectory() as td:
665 669 with self.patch_cp_root(td):
666 670 self.test_file_checkpoints()
667 671
668 672
669 673 class GenericFileCheckpointsAPITest(APITest):
670 674 """
671 675 Run the tests from APITest with GenericFileCheckpoints.
672 676 """
673 677 config = Config()
674 678 config.FileContentsManager.checkpoints_class = GenericFileCheckpoints
675 679
676 680 def test_config_did_something(self):
677 681
678 682 self.assertIsInstance(
679 683 self.notebook.contents_manager.checkpoints,
680 684 GenericFileCheckpoints,
681 685 )
682 686
683 687
General Comments 0
You need to be logged in to leave comments. Login now