##// END OF EJS Templates
Backport PR #8102: TEST: Add test for renaming files with checkpoint....
Min RK -
Show More
@@ -1,655 +1,687 b''
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 from contextlib import contextmanager
6 6 import io
7 7 import json
8 8 import os
9 9 import shutil
10 10 from unicodedata import normalize
11 11
12 12 pjoin = os.path.join
13 13
14 14 import requests
15 15
16 16 from ..filecheckpoints import GenericFileCheckpoints
17 17
18 18 from IPython.config import Config
19 19 from IPython.html.utils import url_path_join, url_escape, to_os_path
20 20 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
21 21 from IPython.nbformat import read, write, from_dict
22 22 from IPython.nbformat.v4 import (
23 23 new_notebook, new_markdown_cell,
24 24 )
25 25 from IPython.nbformat import v2
26 26 from IPython.utils import py3compat
27 27 from IPython.utils.data import uniq_stable
28 28 from IPython.utils.tempdir import TemporaryDirectory
29 29
30 30
31 31 def notebooks_only(dir_model):
32 32 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
33 33
34 34 def dirs_only(dir_model):
35 35 return [x for x in dir_model['content'] if x['type']=='directory']
36 36
37 37
38 38 class API(object):
39 39 """Wrapper for contents API calls."""
40 40 def __init__(self, base_url):
41 41 self.base_url = base_url
42 42
43 43 def _req(self, verb, path, body=None, params=None):
44 44 response = requests.request(verb,
45 45 url_path_join(self.base_url, 'api/contents', path),
46 46 data=body, params=params,
47 47 )
48 48 response.raise_for_status()
49 49 return response
50 50
51 51 def list(self, path='/'):
52 52 return self._req('GET', path)
53 53
54 54 def read(self, path, type=None, format=None, content=None):
55 55 params = {}
56 56 if type is not None:
57 57 params['type'] = type
58 58 if format is not None:
59 59 params['format'] = format
60 60 if content == False:
61 61 params['content'] = '0'
62 62 return self._req('GET', path, params=params)
63 63
64 64 def create_untitled(self, path='/', ext='.ipynb'):
65 65 body = None
66 66 if ext:
67 67 body = json.dumps({'ext': ext})
68 68 return self._req('POST', path, body)
69 69
70 70 def mkdir_untitled(self, path='/'):
71 71 return self._req('POST', path, json.dumps({'type': 'directory'}))
72 72
73 73 def copy(self, copy_from, path='/'):
74 74 body = json.dumps({'copy_from':copy_from})
75 75 return self._req('POST', path, body)
76 76
77 77 def create(self, path='/'):
78 78 return self._req('PUT', path)
79 79
80 80 def upload(self, path, body):
81 81 return self._req('PUT', path, body)
82 82
83 83 def mkdir(self, path='/'):
84 84 return self._req('PUT', path, json.dumps({'type': 'directory'}))
85 85
86 86 def copy_put(self, copy_from, path='/'):
87 87 body = json.dumps({'copy_from':copy_from})
88 88 return self._req('PUT', path, body)
89 89
90 90 def save(self, path, body):
91 91 return self._req('PUT', path, body)
92 92
93 93 def delete(self, path='/'):
94 94 return self._req('DELETE', path)
95 95
96 96 def rename(self, path, new_path):
97 97 body = json.dumps({'path': new_path})
98 98 return self._req('PATCH', path, body)
99 99
100 100 def get_checkpoints(self, path):
101 101 return self._req('GET', url_path_join(path, 'checkpoints'))
102 102
103 103 def new_checkpoint(self, path):
104 104 return self._req('POST', url_path_join(path, 'checkpoints'))
105 105
106 106 def restore_checkpoint(self, path, checkpoint_id):
107 107 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
108 108
109 109 def delete_checkpoint(self, path, checkpoint_id):
110 110 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
111 111
112 112 class APITest(NotebookTestBase):
113 113 """Test the kernels web service API"""
114 114 dirs_nbs = [('', 'inroot'),
115 115 ('Directory with spaces in', 'inspace'),
116 116 (u'unicodΓ©', 'innonascii'),
117 117 ('foo', 'a'),
118 118 ('foo', 'b'),
119 119 ('foo', 'name with spaces'),
120 120 ('foo', u'unicodΓ©'),
121 121 ('foo/bar', 'baz'),
122 122 ('ordering', 'A'),
123 123 ('ordering', 'b'),
124 124 ('ordering', 'C'),
125 125 (u'Γ₯ b', u'Γ§ d'),
126 126 ]
127 127 hidden_dirs = ['.hidden', '__pycache__']
128 128
129 129 # Don't include root dir.
130 130 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs[1:]])
131 131 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
132 132
133 133 @staticmethod
134 134 def _blob_for_name(name):
135 135 return name.encode('utf-8') + b'\xFF'
136 136
137 137 @staticmethod
138 138 def _txt_for_name(name):
139 139 return u'%s text file' % name
140 140
141 141 def to_os_path(self, api_path):
142 142 return to_os_path(api_path, root=self.notebook_dir.name)
143 143
144 144 def make_dir(self, api_path):
145 145 """Create a directory at api_path"""
146 146 os_path = self.to_os_path(api_path)
147 147 try:
148 148 os.makedirs(os_path)
149 149 except OSError:
150 150 print("Directory already exists: %r" % os_path)
151 151
152 152 def make_txt(self, api_path, txt):
153 153 """Make a text file at a given api_path"""
154 154 os_path = self.to_os_path(api_path)
155 155 with io.open(os_path, 'w', encoding='utf-8') as f:
156 156 f.write(txt)
157 157
158 158 def make_blob(self, api_path, blob):
159 159 """Make a binary file at a given api_path"""
160 160 os_path = self.to_os_path(api_path)
161 161 with io.open(os_path, 'wb') as f:
162 162 f.write(blob)
163 163
164 164 def make_nb(self, api_path, nb):
165 165 """Make a notebook file at a given api_path"""
166 166 os_path = self.to_os_path(api_path)
167 167
168 168 with io.open(os_path, 'w', encoding='utf-8') as f:
169 169 write(nb, f, version=4)
170 170
171 171 def delete_dir(self, api_path):
172 172 """Delete a directory at api_path, removing any contents."""
173 173 os_path = self.to_os_path(api_path)
174 174 shutil.rmtree(os_path, ignore_errors=True)
175 175
176 176 def delete_file(self, api_path):
177 177 """Delete a file at the given path if it exists."""
178 178 if self.isfile(api_path):
179 179 os.unlink(self.to_os_path(api_path))
180 180
181 181 def isfile(self, api_path):
182 182 return os.path.isfile(self.to_os_path(api_path))
183 183
184 184 def isdir(self, api_path):
185 185 return os.path.isdir(self.to_os_path(api_path))
186 186
187 187 def setUp(self):
188 188
189 189 for d in (self.dirs + self.hidden_dirs):
190 190 self.make_dir(d)
191 191
192 192 for d, name in self.dirs_nbs:
193 193 # create a notebook
194 194 nb = new_notebook()
195 195 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
196 196
197 197 # create a text file
198 198 txt = self._txt_for_name(name)
199 199 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
200 200
201 201 # create a binary file
202 202 blob = self._blob_for_name(name)
203 203 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
204 204
205 205 self.api = API(self.base_url())
206 206
207 207 def tearDown(self):
208 208 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
209 209 self.delete_dir(dname)
210 210 self.delete_file('inroot.ipynb')
211 211
212 212 def test_list_notebooks(self):
213 213 nbs = notebooks_only(self.api.list().json())
214 214 self.assertEqual(len(nbs), 1)
215 215 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
216 216
217 217 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
218 218 self.assertEqual(len(nbs), 1)
219 219 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
220 220
221 221 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
222 222 self.assertEqual(len(nbs), 1)
223 223 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
224 224 self.assertEqual(nbs[0]['path'], u'unicodΓ©/innonascii.ipynb')
225 225
226 226 nbs = notebooks_only(self.api.list('/foo/bar/').json())
227 227 self.assertEqual(len(nbs), 1)
228 228 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
229 229 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
230 230
231 231 nbs = notebooks_only(self.api.list('foo').json())
232 232 self.assertEqual(len(nbs), 4)
233 233 nbnames = { normalize('NFC', n['name']) for n in nbs }
234 234 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
235 235 expected = { normalize('NFC', name) for name in expected }
236 236 self.assertEqual(nbnames, expected)
237 237
238 238 nbs = notebooks_only(self.api.list('ordering').json())
239 239 nbnames = [n['name'] for n in nbs]
240 240 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
241 241 self.assertEqual(nbnames, expected)
242 242
243 243 def test_list_dirs(self):
244 244 dirs = dirs_only(self.api.list().json())
245 245 dir_names = {normalize('NFC', d['name']) for d in dirs}
246 246 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
247 247
248 248 def test_get_dir_no_content(self):
249 249 for d in self.dirs:
250 250 model = self.api.read(d, content=False).json()
251 251 self.assertEqual(model['path'], d)
252 252 self.assertEqual(model['type'], 'directory')
253 253 self.assertIn('content', model)
254 254 self.assertEqual(model['content'], None)
255 255
256 256 def test_list_nonexistant_dir(self):
257 257 with assert_http_error(404):
258 258 self.api.list('nonexistant')
259 259
260 260 def test_get_nb_contents(self):
261 261 for d, name in self.dirs_nbs:
262 262 path = url_path_join(d, name + '.ipynb')
263 263 nb = self.api.read(path).json()
264 264 self.assertEqual(nb['name'], u'%s.ipynb' % name)
265 265 self.assertEqual(nb['path'], path)
266 266 self.assertEqual(nb['type'], 'notebook')
267 267 self.assertIn('content', nb)
268 268 self.assertEqual(nb['format'], 'json')
269 269 self.assertIn('metadata', nb['content'])
270 270 self.assertIsInstance(nb['content']['metadata'], dict)
271 271
272 272 def test_get_nb_no_content(self):
273 273 for d, name in self.dirs_nbs:
274 274 path = url_path_join(d, name + '.ipynb')
275 275 nb = self.api.read(path, content=False).json()
276 276 self.assertEqual(nb['name'], u'%s.ipynb' % name)
277 277 self.assertEqual(nb['path'], path)
278 278 self.assertEqual(nb['type'], 'notebook')
279 279 self.assertIn('content', nb)
280 280 self.assertEqual(nb['content'], None)
281 281
282 282 def test_get_contents_no_such_file(self):
283 283 # Name that doesn't exist - should be a 404
284 284 with assert_http_error(404):
285 285 self.api.read('foo/q.ipynb')
286 286
287 287 def test_get_text_file_contents(self):
288 288 for d, name in self.dirs_nbs:
289 289 path = url_path_join(d, name + '.txt')
290 290 model = self.api.read(path).json()
291 291 self.assertEqual(model['name'], u'%s.txt' % name)
292 292 self.assertEqual(model['path'], path)
293 293 self.assertIn('content', model)
294 294 self.assertEqual(model['format'], 'text')
295 295 self.assertEqual(model['type'], 'file')
296 296 self.assertEqual(model['content'], self._txt_for_name(name))
297 297
298 298 # Name that doesn't exist - should be a 404
299 299 with assert_http_error(404):
300 300 self.api.read('foo/q.txt')
301 301
302 302 # Specifying format=text should fail on a non-UTF-8 file
303 303 with assert_http_error(400):
304 304 self.api.read('foo/bar/baz.blob', type='file', format='text')
305 305
306 306 def test_get_binary_file_contents(self):
307 307 for d, name in self.dirs_nbs:
308 308 path = url_path_join(d, name + '.blob')
309 309 model = self.api.read(path).json()
310 310 self.assertEqual(model['name'], u'%s.blob' % name)
311 311 self.assertEqual(model['path'], path)
312 312 self.assertIn('content', model)
313 313 self.assertEqual(model['format'], 'base64')
314 314 self.assertEqual(model['type'], 'file')
315 315 self.assertEqual(
316 316 base64.decodestring(model['content'].encode('ascii')),
317 317 self._blob_for_name(name),
318 318 )
319 319
320 320 # Name that doesn't exist - should be a 404
321 321 with assert_http_error(404):
322 322 self.api.read('foo/q.txt')
323 323
324 324 def test_get_bad_type(self):
325 325 with assert_http_error(400):
326 326 self.api.read(u'unicodΓ©', type='file') # this is a directory
327 327
328 328 with assert_http_error(400):
329 329 self.api.read(u'unicodΓ©/innonascii.ipynb', type='directory')
330 330
331 331 def _check_created(self, resp, path, type='notebook'):
332 332 self.assertEqual(resp.status_code, 201)
333 333 location_header = py3compat.str_to_unicode(resp.headers['Location'])
334 334 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
335 335 rjson = resp.json()
336 336 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
337 337 self.assertEqual(rjson['path'], path)
338 338 self.assertEqual(rjson['type'], type)
339 339 isright = self.isdir if type == 'directory' else self.isfile
340 340 assert isright(path)
341 341
342 342 def test_create_untitled(self):
343 343 resp = self.api.create_untitled(path=u'Γ₯ b')
344 344 self._check_created(resp, u'Γ₯ b/Untitled.ipynb')
345 345
346 346 # Second time
347 347 resp = self.api.create_untitled(path=u'Γ₯ b')
348 348 self._check_created(resp, u'Γ₯ b/Untitled1.ipynb')
349 349
350 350 # And two directories down
351 351 resp = self.api.create_untitled(path='foo/bar')
352 352 self._check_created(resp, 'foo/bar/Untitled.ipynb')
353 353
354 354 def test_create_untitled_txt(self):
355 355 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
356 356 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
357 357
358 358 resp = self.api.read(path='foo/bar/untitled.txt')
359 359 model = resp.json()
360 360 self.assertEqual(model['type'], 'file')
361 361 self.assertEqual(model['format'], 'text')
362 362 self.assertEqual(model['content'], '')
363 363
364 364 def test_upload(self):
365 365 nb = new_notebook()
366 366 nbmodel = {'content': nb, 'type': 'notebook'}
367 367 path = u'Γ₯ b/Upload tΓ©st.ipynb'
368 368 resp = self.api.upload(path, body=json.dumps(nbmodel))
369 369 self._check_created(resp, path)
370 370
371 371 def test_mkdir_untitled(self):
372 372 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
373 373 self._check_created(resp, u'Γ₯ b/Untitled Folder', type='directory')
374 374
375 375 # Second time
376 376 resp = self.api.mkdir_untitled(path=u'Γ₯ b')
377 377 self._check_created(resp, u'Γ₯ b/Untitled Folder 1', type='directory')
378 378
379 379 # And two directories down
380 380 resp = self.api.mkdir_untitled(path='foo/bar')
381 381 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
382 382
383 383 def test_mkdir(self):
384 384 path = u'Γ₯ b/New βˆ‚ir'
385 385 resp = self.api.mkdir(path)
386 386 self._check_created(resp, path, type='directory')
387 387
388 388 def test_mkdir_hidden_400(self):
389 389 with assert_http_error(400):
390 390 resp = self.api.mkdir(u'Γ₯ b/.hidden')
391 391
392 392 def test_upload_txt(self):
393 393 body = u'ΓΌnicode tΓ©xt'
394 394 model = {
395 395 'content' : body,
396 396 'format' : 'text',
397 397 'type' : 'file',
398 398 }
399 399 path = u'Γ₯ b/Upload tΓ©st.txt'
400 400 resp = self.api.upload(path, body=json.dumps(model))
401 401
402 402 # check roundtrip
403 403 resp = self.api.read(path)
404 404 model = resp.json()
405 405 self.assertEqual(model['type'], 'file')
406 406 self.assertEqual(model['format'], 'text')
407 407 self.assertEqual(model['content'], body)
408 408
409 409 def test_upload_b64(self):
410 410 body = b'\xFFblob'
411 411 b64body = base64.encodestring(body).decode('ascii')
412 412 model = {
413 413 'content' : b64body,
414 414 'format' : 'base64',
415 415 'type' : 'file',
416 416 }
417 417 path = u'Γ₯ b/Upload tΓ©st.blob'
418 418 resp = self.api.upload(path, body=json.dumps(model))
419 419
420 420 # check roundtrip
421 421 resp = self.api.read(path)
422 422 model = resp.json()
423 423 self.assertEqual(model['type'], 'file')
424 424 self.assertEqual(model['path'], path)
425 425 self.assertEqual(model['format'], 'base64')
426 426 decoded = base64.decodestring(model['content'].encode('ascii'))
427 427 self.assertEqual(decoded, body)
428 428
429 429 def test_upload_v2(self):
430 430 nb = v2.new_notebook()
431 431 ws = v2.new_worksheet()
432 432 nb.worksheets.append(ws)
433 433 ws.cells.append(v2.new_code_cell(input='print("hi")'))
434 434 nbmodel = {'content': nb, 'type': 'notebook'}
435 435 path = u'Γ₯ b/Upload tΓ©st.ipynb'
436 436 resp = self.api.upload(path, body=json.dumps(nbmodel))
437 437 self._check_created(resp, path)
438 438 resp = self.api.read(path)
439 439 data = resp.json()
440 440 self.assertEqual(data['content']['nbformat'], 4)
441 441
442 442 def test_copy(self):
443 443 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
444 444 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
445 445
446 446 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
447 447 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
448 448
449 449 def test_copy_copy(self):
450 450 resp = self.api.copy(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b')
451 451 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy1.ipynb')
452 452
453 453 resp = self.api.copy(u'Γ₯ b/Γ§ d-Copy1.ipynb', u'Γ₯ b')
454 454 self._check_created(resp, u'Γ₯ b/Γ§ d-Copy2.ipynb')
455 455
456 456 def test_copy_path(self):
457 457 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
458 458 self._check_created(resp, u'Γ₯ b/a.ipynb')
459 459
460 460 resp = self.api.copy(u'foo/a.ipynb', u'Γ₯ b')
461 461 self._check_created(resp, u'Γ₯ b/a-Copy1.ipynb')
462 462
463 463 def test_copy_put_400(self):
464 464 with assert_http_error(400):
465 465 resp = self.api.copy_put(u'Γ₯ b/Γ§ d.ipynb', u'Γ₯ b/cΓΈpy.ipynb')
466 466
467 467 def test_copy_dir_400(self):
468 468 # can't copy directories
469 469 with assert_http_error(400):
470 470 resp = self.api.copy(u'Γ₯ b', u'foo')
471 471
472 472 def test_delete(self):
473 473 for d, name in self.dirs_nbs:
474 474 print('%r, %r' % (d, name))
475 475 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
476 476 self.assertEqual(resp.status_code, 204)
477 477
478 478 for d in self.dirs + ['/']:
479 479 nbs = notebooks_only(self.api.list(d).json())
480 480 print('------')
481 481 print(d)
482 482 print(nbs)
483 483 self.assertEqual(nbs, [])
484 484
485 485 def test_delete_dirs(self):
486 486 # depth-first delete everything, so we don't try to delete empty directories
487 487 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
488 488 listing = self.api.list(name).json()['content']
489 489 for model in listing:
490 490 self.api.delete(model['path'])
491 491 listing = self.api.list('/').json()['content']
492 492 self.assertEqual(listing, [])
493 493
494 494 def test_delete_non_empty_dir(self):
495 495 """delete non-empty dir raises 400"""
496 496 with assert_http_error(400):
497 497 self.api.delete(u'Γ₯ b')
498 498
499 499 def test_rename(self):
500 500 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
501 501 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
502 502 self.assertEqual(resp.json()['name'], 'z.ipynb')
503 503 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
504 504 assert self.isfile('foo/z.ipynb')
505 505
506 506 nbs = notebooks_only(self.api.list('foo').json())
507 507 nbnames = set(n['name'] for n in nbs)
508 508 self.assertIn('z.ipynb', nbnames)
509 509 self.assertNotIn('a.ipynb', nbnames)
510 510
511 def test_checkpoints_follow_file(self):
512
513 # Read initial file state
514 orig = self.api.read('foo/a.ipynb')
515
516 # Create a checkpoint of initial state
517 r = self.api.new_checkpoint('foo/a.ipynb')
518 cp1 = r.json()
519
520 # Modify file and save
521 nbcontent = json.loads(orig.text)['content']
522 nb = from_dict(nbcontent)
523 hcell = new_markdown_cell('Created by test')
524 nb.cells.append(hcell)
525 nbmodel = {'content': nb, 'type': 'notebook'}
526 self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
527
528 # Rename the file.
529 self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
530
531 # Looking for checkpoints in the old location should yield no results.
532 self.assertEqual(self.api.get_checkpoints('foo/a.ipynb').json(), [])
533
534 # Looking for checkpoints in the new location should work.
535 cps = self.api.get_checkpoints('foo/z.ipynb').json()
536 self.assertEqual(cps, [cp1])
537
538 # Delete the file. The checkpoint should be deleted as well.
539 self.api.delete('foo/z.ipynb')
540 cps = self.api.get_checkpoints('foo/z.ipynb').json()
541 self.assertEqual(cps, [])
542
511 543 def test_rename_existing(self):
512 544 with assert_http_error(409):
513 545 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
514 546
515 547 def test_save(self):
516 548 resp = self.api.read('foo/a.ipynb')
517 549 nbcontent = json.loads(resp.text)['content']
518 550 nb = from_dict(nbcontent)
519 551 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
520 552
521 553 nbmodel= {'content': nb, 'type': 'notebook'}
522 554 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
523 555
524 556 nbcontent = self.api.read('foo/a.ipynb').json()['content']
525 557 newnb = from_dict(nbcontent)
526 558 self.assertEqual(newnb.cells[0].source,
527 559 u'Created by test Β³')
528 560
529 561 def test_checkpoints(self):
530 562 resp = self.api.read('foo/a.ipynb')
531 563 r = self.api.new_checkpoint('foo/a.ipynb')
532 564 self.assertEqual(r.status_code, 201)
533 565 cp1 = r.json()
534 566 self.assertEqual(set(cp1), {'id', 'last_modified'})
535 567 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
536 568
537 569 # Modify it
538 570 nbcontent = json.loads(resp.text)['content']
539 571 nb = from_dict(nbcontent)
540 572 hcell = new_markdown_cell('Created by test')
541 573 nb.cells.append(hcell)
542 574 # Save
543 575 nbmodel= {'content': nb, 'type': 'notebook'}
544 576 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
545 577
546 578 # List checkpoints
547 579 cps = self.api.get_checkpoints('foo/a.ipynb').json()
548 580 self.assertEqual(cps, [cp1])
549 581
550 582 nbcontent = self.api.read('foo/a.ipynb').json()['content']
551 583 nb = from_dict(nbcontent)
552 584 self.assertEqual(nb.cells[0].source, 'Created by test')
553 585
554 586 # Restore cp1
555 587 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
556 588 self.assertEqual(r.status_code, 204)
557 589 nbcontent = self.api.read('foo/a.ipynb').json()['content']
558 590 nb = from_dict(nbcontent)
559 591 self.assertEqual(nb.cells, [])
560 592
561 593 # Delete cp1
562 594 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
563 595 self.assertEqual(r.status_code, 204)
564 596 cps = self.api.get_checkpoints('foo/a.ipynb').json()
565 597 self.assertEqual(cps, [])
566 598
567 599 def test_file_checkpoints(self):
568 600 """
569 601 Test checkpointing of non-notebook files.
570 602 """
571 603 filename = 'foo/a.txt'
572 604 resp = self.api.read(filename)
573 605 orig_content = json.loads(resp.text)['content']
574 606
575 607 # Create a checkpoint.
576 608 r = self.api.new_checkpoint(filename)
577 609 self.assertEqual(r.status_code, 201)
578 610 cp1 = r.json()
579 611 self.assertEqual(set(cp1), {'id', 'last_modified'})
580 612 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
581 613
582 614 # Modify the file and save.
583 615 new_content = orig_content + '\nsecond line'
584 616 model = {
585 617 'content': new_content,
586 618 'type': 'file',
587 619 'format': 'text',
588 620 }
589 621 resp = self.api.save(filename, body=json.dumps(model))
590 622
591 623 # List checkpoints
592 624 cps = self.api.get_checkpoints(filename).json()
593 625 self.assertEqual(cps, [cp1])
594 626
595 627 content = self.api.read(filename).json()['content']
596 628 self.assertEqual(content, new_content)
597 629
598 630 # Restore cp1
599 631 r = self.api.restore_checkpoint(filename, cp1['id'])
600 632 self.assertEqual(r.status_code, 204)
601 633 restored_content = self.api.read(filename).json()['content']
602 634 self.assertEqual(restored_content, orig_content)
603 635
604 636 # Delete cp1
605 637 r = self.api.delete_checkpoint(filename, cp1['id'])
606 638 self.assertEqual(r.status_code, 204)
607 639 cps = self.api.get_checkpoints(filename).json()
608 640 self.assertEqual(cps, [])
609 641
610 642 @contextmanager
611 643 def patch_cp_root(self, dirname):
612 644 """
613 645 Temporarily patch the root dir of our checkpoint manager.
614 646 """
615 647 cpm = self.notebook.contents_manager.checkpoints
616 648 old_dirname = cpm.root_dir
617 649 cpm.root_dir = dirname
618 650 try:
619 651 yield
620 652 finally:
621 653 cpm.root_dir = old_dirname
622 654
623 655 def test_checkpoints_separate_root(self):
624 656 """
625 657 Test that FileCheckpoints functions correctly even when it's
626 658 using a different root dir from FileContentsManager. This also keeps
627 659 the implementation honest for use with ContentsManagers that don't map
628 660 models to the filesystem
629 661
630 662 Override this method to a no-op when testing other managers.
631 663 """
632 664 with TemporaryDirectory() as td:
633 665 with self.patch_cp_root(td):
634 666 self.test_checkpoints()
635 667
636 668 with TemporaryDirectory() as td:
637 669 with self.patch_cp_root(td):
638 670 self.test_file_checkpoints()
639 671
640 672
641 673 class GenericFileCheckpointsAPITest(APITest):
642 674 """
643 675 Run the tests from APITest with GenericFileCheckpoints.
644 676 """
645 677 config = Config()
646 678 config.FileContentsManager.checkpoints_class = GenericFileCheckpoints
647 679
648 680 def test_config_did_something(self):
649 681
650 682 self.assertIsInstance(
651 683 self.notebook.contents_manager.checkpoints,
652 684 GenericFileCheckpoints,
653 685 )
654 686
655 687
General Comments 0
You need to be logged in to leave comments. Login now