##// END OF EJS Templates
TEST: Abstract out directory/file deletion methods.
Scott Sanderson -
Show More
@@ -1,536 +1,542
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 import io
6 6 import json
7 7 import os
8 8 import shutil
9 9 from unicodedata import normalize
10 10
11 11 pjoin = os.path.join
12 12
13 13 import requests
14 14
15 15 from IPython.html.utils import url_path_join, url_escape, to_os_path
16 16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 17 from IPython.nbformat import read, write, from_dict
18 18 from IPython.nbformat.v4 import (
19 19 new_notebook, new_markdown_cell,
20 20 )
21 21 from IPython.nbformat import v2
22 22 from IPython.utils import py3compat
23 23 from IPython.utils.data import uniq_stable
24 24
25 25
26 26 def notebooks_only(dir_model):
27 27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
28 28
29 29 def dirs_only(dir_model):
30 30 return [x for x in dir_model['content'] if x['type']=='directory']
31 31
32 32
33 33 class API(object):
34 34 """Wrapper for contents API calls."""
35 35 def __init__(self, base_url):
36 36 self.base_url = base_url
37 37
38 38 def _req(self, verb, path, body=None, params=None):
39 39 response = requests.request(verb,
40 40 url_path_join(self.base_url, 'api/contents', path),
41 41 data=body, params=params,
42 42 )
43 43 response.raise_for_status()
44 44 return response
45 45
46 46 def list(self, path='/'):
47 47 return self._req('GET', path)
48 48
49 49 def read(self, path, type=None, format=None):
50 50 params = {}
51 51 if type is not None:
52 52 params['type'] = type
53 53 if format is not None:
54 54 params['format'] = format
55 55 return self._req('GET', path, params=params)
56 56
57 57 def create_untitled(self, path='/', ext='.ipynb'):
58 58 body = None
59 59 if ext:
60 60 body = json.dumps({'ext': ext})
61 61 return self._req('POST', path, body)
62 62
63 63 def mkdir_untitled(self, path='/'):
64 64 return self._req('POST', path, json.dumps({'type': 'directory'}))
65 65
66 66 def copy(self, copy_from, path='/'):
67 67 body = json.dumps({'copy_from':copy_from})
68 68 return self._req('POST', path, body)
69 69
70 70 def create(self, path='/'):
71 71 return self._req('PUT', path)
72 72
73 73 def upload(self, path, body):
74 74 return self._req('PUT', path, body)
75 75
76 76 def mkdir(self, path='/'):
77 77 return self._req('PUT', path, json.dumps({'type': 'directory'}))
78 78
79 79 def copy_put(self, copy_from, path='/'):
80 80 body = json.dumps({'copy_from':copy_from})
81 81 return self._req('PUT', path, body)
82 82
83 83 def save(self, path, body):
84 84 return self._req('PUT', path, body)
85 85
86 86 def delete(self, path='/'):
87 87 return self._req('DELETE', path)
88 88
89 89 def rename(self, path, new_path):
90 90 body = json.dumps({'path': new_path})
91 91 return self._req('PATCH', path, body)
92 92
93 93 def get_checkpoints(self, path):
94 94 return self._req('GET', url_path_join(path, 'checkpoints'))
95 95
96 96 def new_checkpoint(self, path):
97 97 return self._req('POST', url_path_join(path, 'checkpoints'))
98 98
99 99 def restore_checkpoint(self, path, checkpoint_id):
100 100 return self._req('POST', url_path_join(path, 'checkpoints', checkpoint_id))
101 101
102 102 def delete_checkpoint(self, path, checkpoint_id):
103 103 return self._req('DELETE', url_path_join(path, 'checkpoints', checkpoint_id))
104 104
105 105 class APITest(NotebookTestBase):
106 106 """Test the kernels web service API"""
107 107 dirs_nbs = [('', 'inroot'),
108 108 ('Directory with spaces in', 'inspace'),
109 109 (u'unicodé', 'innonascii'),
110 110 ('foo', 'a'),
111 111 ('foo', 'b'),
112 112 ('foo', 'name with spaces'),
113 113 ('foo', u'unicodé'),
114 114 ('foo/bar', 'baz'),
115 115 ('ordering', 'A'),
116 116 ('ordering', 'b'),
117 117 ('ordering', 'C'),
118 118 (u'å b', u'ç d'),
119 119 ]
120 120 hidden_dirs = ['.hidden', '__pycache__']
121 121
122 122 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
123 123 del dirs[0] # remove ''
124 124 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
125 125
126 126 @staticmethod
127 127 def _blob_for_name(name):
128 128 return name.encode('utf-8') + b'\xFF'
129 129
130 130 @staticmethod
131 131 def _txt_for_name(name):
132 132 return u'%s text file' % name
133 133
134 134 def to_os_path(self, api_path):
135 135 return to_os_path(api_path, root=self.notebook_dir.name)
136 136
137 137 def make_dir(self, api_path):
138 138 """Create a directory at api_path"""
139 139 os_path = self.to_os_path(api_path)
140 140 try:
141 141 os.makedirs(os_path)
142 142 except OSError:
143 143 print("Directory already exists: %r" % os_path)
144
144
145 145 def make_txt(self, api_path, txt):
146 146 """Make a text file at a given api_path"""
147 147 os_path = self.to_os_path(api_path)
148 148 with io.open(os_path, 'w', encoding='utf-8') as f:
149 149 f.write(txt)
150 150
151 151 def make_blob(self, api_path, blob):
152 152 """Make a binary file at a given api_path"""
153 153 os_path = self.to_os_path(api_path)
154 154 with io.open(os_path, 'wb') as f:
155 155 f.write(blob)
156 156
157 157 def make_nb(self, api_path, nb):
158 158 """Make a notebook file at a given api_path"""
159 159 os_path = self.to_os_path(api_path)
160 160
161 161 with io.open(os_path, 'w', encoding='utf-8') as f:
162 162 write(nb, f, version=4)
163
164 def delete_dir(self, api_path):
165 """Delete a directory at api_path, removing any contents."""
166 os_path = self.to_os_path(api_path)
167 shutil.rmtree(os_path, ignore_errors=True)
168
169 def delete_file(self, api_path):
170 """Delete a file at the given path if it exists."""
171 if self.isfile(api_path):
172 os.unlink(self.to_os_path(api_path))
163 173
164 174 def isfile(self, api_path):
165 175 return os.path.isfile(self.to_os_path(api_path))
166 176
167 177 def isdir(self, api_path):
168 178 return os.path.isdir(self.to_os_path(api_path))
169 179
170 180 def setUp(self):
171 181 self.blob = os.urandom(100)
172 182 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
173 183
174 184 for d in (self.dirs + self.hidden_dirs):
175 185 self.make_dir(d)
176 186
177 187 for d, name in self.dirs_nbs:
178 188 # create a notebook
179 189 nb = new_notebook()
180 190 self.make_nb(u'{}/{}.ipynb'.format(d, name), nb)
181 191
182 192 # create a text file
183 193 txt = self._txt_for_name(name)
184 194 self.make_txt(u'{}/{}.txt'.format(d, name), txt)
185 195
186 196 # create a binary file
187 197 blob = self._blob_for_name(name)
188 198 self.make_blob(u'{}/{}.blob'.format(d, name), blob)
189 199
190 200 self.api = API(self.base_url())
191 201
192 202 def tearDown(self):
193 nbdir = self.notebook_dir.name
194
195 203 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
196 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
197
198 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
199 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
204 self.delete_dir(dname)
205 self.delete_file('inroot.ipynb')
200 206
201 207 def test_list_notebooks(self):
202 208 nbs = notebooks_only(self.api.list().json())
203 209 self.assertEqual(len(nbs), 1)
204 210 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
205 211
206 212 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
207 213 self.assertEqual(len(nbs), 1)
208 214 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
209 215
210 216 nbs = notebooks_only(self.api.list(u'/unicodé/').json())
211 217 self.assertEqual(len(nbs), 1)
212 218 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
213 219 self.assertEqual(nbs[0]['path'], u'unicodé/innonascii.ipynb')
214 220
215 221 nbs = notebooks_only(self.api.list('/foo/bar/').json())
216 222 self.assertEqual(len(nbs), 1)
217 223 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
218 224 self.assertEqual(nbs[0]['path'], 'foo/bar/baz.ipynb')
219 225
220 226 nbs = notebooks_only(self.api.list('foo').json())
221 227 self.assertEqual(len(nbs), 4)
222 228 nbnames = { normalize('NFC', n['name']) for n in nbs }
223 229 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
224 230 expected = { normalize('NFC', name) for name in expected }
225 231 self.assertEqual(nbnames, expected)
226 232
227 233 nbs = notebooks_only(self.api.list('ordering').json())
228 234 nbnames = [n['name'] for n in nbs]
229 235 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
230 236 self.assertEqual(nbnames, expected)
231 237
232 238 def test_list_dirs(self):
233 239 dirs = dirs_only(self.api.list().json())
234 240 dir_names = {normalize('NFC', d['name']) for d in dirs}
235 241 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
236 242
237 243 def test_list_nonexistant_dir(self):
238 244 with assert_http_error(404):
239 245 self.api.list('nonexistant')
240 246
241 247 def test_get_nb_contents(self):
242 248 for d, name in self.dirs_nbs:
243 249 path = url_path_join(d, name + '.ipynb')
244 250 nb = self.api.read(path).json()
245 251 self.assertEqual(nb['name'], u'%s.ipynb' % name)
246 252 self.assertEqual(nb['path'], path)
247 253 self.assertEqual(nb['type'], 'notebook')
248 254 self.assertIn('content', nb)
249 255 self.assertEqual(nb['format'], 'json')
250 256 self.assertIn('content', nb)
251 257 self.assertIn('metadata', nb['content'])
252 258 self.assertIsInstance(nb['content']['metadata'], dict)
253 259
254 260 def test_get_contents_no_such_file(self):
255 261 # Name that doesn't exist - should be a 404
256 262 with assert_http_error(404):
257 263 self.api.read('foo/q.ipynb')
258 264
259 265 def test_get_text_file_contents(self):
260 266 for d, name in self.dirs_nbs:
261 267 path = url_path_join(d, name + '.txt')
262 268 model = self.api.read(path).json()
263 269 self.assertEqual(model['name'], u'%s.txt' % name)
264 270 self.assertEqual(model['path'], path)
265 271 self.assertIn('content', model)
266 272 self.assertEqual(model['format'], 'text')
267 273 self.assertEqual(model['type'], 'file')
268 274 self.assertEqual(model['content'], self._txt_for_name(name))
269 275
270 276 # Name that doesn't exist - should be a 404
271 277 with assert_http_error(404):
272 278 self.api.read('foo/q.txt')
273 279
274 280 # Specifying format=text should fail on a non-UTF-8 file
275 281 with assert_http_error(400):
276 282 self.api.read('foo/bar/baz.blob', type='file', format='text')
277 283
278 284 def test_get_binary_file_contents(self):
279 285 for d, name in self.dirs_nbs:
280 286 path = url_path_join(d, name + '.blob')
281 287 model = self.api.read(path).json()
282 288 self.assertEqual(model['name'], u'%s.blob' % name)
283 289 self.assertEqual(model['path'], path)
284 290 self.assertIn('content', model)
285 291 self.assertEqual(model['format'], 'base64')
286 292 self.assertEqual(model['type'], 'file')
287 293 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
288 294 self.assertEqual(model['content'], b64_data)
289 295
290 296 # Name that doesn't exist - should be a 404
291 297 with assert_http_error(404):
292 298 self.api.read('foo/q.txt')
293 299
294 300 def test_get_bad_type(self):
295 301 with assert_http_error(400):
296 302 self.api.read(u'unicodé', type='file') # this is a directory
297 303
298 304 with assert_http_error(400):
299 305 self.api.read(u'unicodé/innonascii.ipynb', type='directory')
300 306
301 307 def _check_created(self, resp, path, type='notebook'):
302 308 self.assertEqual(resp.status_code, 201)
303 309 location_header = py3compat.str_to_unicode(resp.headers['Location'])
304 310 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path)))
305 311 rjson = resp.json()
306 312 self.assertEqual(rjson['name'], path.rsplit('/', 1)[-1])
307 313 self.assertEqual(rjson['path'], path)
308 314 self.assertEqual(rjson['type'], type)
309 315 isright = self.isdir if type == 'directory' else self.isfile
310 316 assert isright(path)
311 317
312 318 def test_create_untitled(self):
313 319 resp = self.api.create_untitled(path=u'å b')
314 320 self._check_created(resp, u'å b/Untitled.ipynb')
315 321
316 322 # Second time
317 323 resp = self.api.create_untitled(path=u'å b')
318 324 self._check_created(resp, u'å b/Untitled1.ipynb')
319 325
320 326 # And two directories down
321 327 resp = self.api.create_untitled(path='foo/bar')
322 328 self._check_created(resp, 'foo/bar/Untitled.ipynb')
323 329
324 330 def test_create_untitled_txt(self):
325 331 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
326 332 self._check_created(resp, 'foo/bar/untitled.txt', type='file')
327 333
328 334 resp = self.api.read(path='foo/bar/untitled.txt')
329 335 model = resp.json()
330 336 self.assertEqual(model['type'], 'file')
331 337 self.assertEqual(model['format'], 'text')
332 338 self.assertEqual(model['content'], '')
333 339
334 340 def test_upload(self):
335 341 nb = new_notebook()
336 342 nbmodel = {'content': nb, 'type': 'notebook'}
337 343 path = u'å b/Upload tést.ipynb'
338 344 resp = self.api.upload(path, body=json.dumps(nbmodel))
339 345 self._check_created(resp, path)
340 346
341 347 def test_mkdir_untitled(self):
342 348 resp = self.api.mkdir_untitled(path=u'å b')
343 349 self._check_created(resp, u'å b/Untitled Folder', type='directory')
344 350
345 351 # Second time
346 352 resp = self.api.mkdir_untitled(path=u'å b')
347 353 self._check_created(resp, u'å b/Untitled Folder 1', type='directory')
348 354
349 355 # And two directories down
350 356 resp = self.api.mkdir_untitled(path='foo/bar')
351 357 self._check_created(resp, 'foo/bar/Untitled Folder', type='directory')
352 358
353 359 def test_mkdir(self):
354 360 path = u'å b/New ∂ir'
355 361 resp = self.api.mkdir(path)
356 362 self._check_created(resp, path, type='directory')
357 363
358 364 def test_mkdir_hidden_400(self):
359 365 with assert_http_error(400):
360 366 resp = self.api.mkdir(u'å b/.hidden')
361 367
362 368 def test_upload_txt(self):
363 369 body = u'ünicode téxt'
364 370 model = {
365 371 'content' : body,
366 372 'format' : 'text',
367 373 'type' : 'file',
368 374 }
369 375 path = u'å b/Upload tést.txt'
370 376 resp = self.api.upload(path, body=json.dumps(model))
371 377
372 378 # check roundtrip
373 379 resp = self.api.read(path)
374 380 model = resp.json()
375 381 self.assertEqual(model['type'], 'file')
376 382 self.assertEqual(model['format'], 'text')
377 383 self.assertEqual(model['content'], body)
378 384
379 385 def test_upload_b64(self):
380 386 body = b'\xFFblob'
381 387 b64body = base64.encodestring(body).decode('ascii')
382 388 model = {
383 389 'content' : b64body,
384 390 'format' : 'base64',
385 391 'type' : 'file',
386 392 }
387 393 path = u'å b/Upload tést.blob'
388 394 resp = self.api.upload(path, body=json.dumps(model))
389 395
390 396 # check roundtrip
391 397 resp = self.api.read(path)
392 398 model = resp.json()
393 399 self.assertEqual(model['type'], 'file')
394 400 self.assertEqual(model['path'], path)
395 401 self.assertEqual(model['format'], 'base64')
396 402 decoded = base64.decodestring(model['content'].encode('ascii'))
397 403 self.assertEqual(decoded, body)
398 404
399 405 def test_upload_v2(self):
400 406 nb = v2.new_notebook()
401 407 ws = v2.new_worksheet()
402 408 nb.worksheets.append(ws)
403 409 ws.cells.append(v2.new_code_cell(input='print("hi")'))
404 410 nbmodel = {'content': nb, 'type': 'notebook'}
405 411 path = u'å b/Upload tést.ipynb'
406 412 resp = self.api.upload(path, body=json.dumps(nbmodel))
407 413 self._check_created(resp, path)
408 414 resp = self.api.read(path)
409 415 data = resp.json()
410 416 self.assertEqual(data['content']['nbformat'], 4)
411 417
412 418 def test_copy(self):
413 419 resp = self.api.copy(u'å b/ç d.ipynb', u'å b')
414 420 self._check_created(resp, u'å b/ç d-Copy1.ipynb')
415 421
416 422 resp = self.api.copy(u'å b/ç d.ipynb', u'å b')
417 423 self._check_created(resp, u'å b/ç d-Copy2.ipynb')
418 424
419 425 def test_copy_copy(self):
420 426 resp = self.api.copy(u'å b/ç d.ipynb', u'å b')
421 427 self._check_created(resp, u'å b/ç d-Copy1.ipynb')
422 428
423 429 resp = self.api.copy(u'å b/ç d-Copy1.ipynb', u'å b')
424 430 self._check_created(resp, u'å b/ç d-Copy2.ipynb')
425 431
426 432 def test_copy_path(self):
427 433 resp = self.api.copy(u'foo/a.ipynb', u'å b')
428 434 self._check_created(resp, u'å b/a.ipynb')
429 435
430 436 resp = self.api.copy(u'foo/a.ipynb', u'å b')
431 437 self._check_created(resp, u'å b/a-Copy1.ipynb')
432 438
433 439 def test_copy_put_400(self):
434 440 with assert_http_error(400):
435 441 resp = self.api.copy_put(u'å b/ç d.ipynb', u'å b/cøpy.ipynb')
436 442
437 443 def test_copy_dir_400(self):
438 444 # can't copy directories
439 445 with assert_http_error(400):
440 446 resp = self.api.copy(u'å b', u'foo')
441 447
442 448 def test_delete(self):
443 449 for d, name in self.dirs_nbs:
444 450 print('%r, %r' % (d, name))
445 451 resp = self.api.delete(url_path_join(d, name + '.ipynb'))
446 452 self.assertEqual(resp.status_code, 204)
447 453
448 454 for d in self.dirs + ['/']:
449 455 nbs = notebooks_only(self.api.list(d).json())
450 456 print('------')
451 457 print(d)
452 458 print(nbs)
453 459 self.assertEqual(nbs, [])
454 460
455 461 def test_delete_dirs(self):
456 462 # depth-first delete everything, so we don't try to delete empty directories
457 463 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
458 464 listing = self.api.list(name).json()['content']
459 465 for model in listing:
460 466 self.api.delete(model['path'])
461 467 listing = self.api.list('/').json()['content']
462 468 self.assertEqual(listing, [])
463 469
464 470 def test_delete_non_empty_dir(self):
465 471 """delete non-empty dir raises 400"""
466 472 with assert_http_error(400):
467 473 self.api.delete(u'å b')
468 474
469 475 def test_rename(self):
470 476 resp = self.api.rename('foo/a.ipynb', 'foo/z.ipynb')
471 477 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
472 478 self.assertEqual(resp.json()['name'], 'z.ipynb')
473 479 self.assertEqual(resp.json()['path'], 'foo/z.ipynb')
474 480 assert self.isfile('foo/z.ipynb')
475 481
476 482 nbs = notebooks_only(self.api.list('foo').json())
477 483 nbnames = set(n['name'] for n in nbs)
478 484 self.assertIn('z.ipynb', nbnames)
479 485 self.assertNotIn('a.ipynb', nbnames)
480 486
481 487 def test_rename_existing(self):
482 488 with assert_http_error(409):
483 489 self.api.rename('foo/a.ipynb', 'foo/b.ipynb')
484 490
485 491 def test_save(self):
486 492 resp = self.api.read('foo/a.ipynb')
487 493 nbcontent = json.loads(resp.text)['content']
488 494 nb = from_dict(nbcontent)
489 495 nb.cells.append(new_markdown_cell(u'Created by test ³'))
490 496
491 497 nbmodel= {'content': nb, 'type': 'notebook'}
492 498 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
493 499
494 500 nbcontent = self.api.read('foo/a.ipynb').json()['content']
495 501 newnb = from_dict(nbcontent)
496 502 self.assertEqual(newnb.cells[0].source,
497 503 u'Created by test ³')
498 504
499 505
500 506 def test_checkpoints(self):
501 507 resp = self.api.read('foo/a.ipynb')
502 508 r = self.api.new_checkpoint('foo/a.ipynb')
503 509 self.assertEqual(r.status_code, 201)
504 510 cp1 = r.json()
505 511 self.assertEqual(set(cp1), {'id', 'last_modified'})
506 512 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
507 513
508 514 # Modify it
509 515 nbcontent = json.loads(resp.text)['content']
510 516 nb = from_dict(nbcontent)
511 517 hcell = new_markdown_cell('Created by test')
512 518 nb.cells.append(hcell)
513 519 # Save
514 520 nbmodel= {'content': nb, 'type': 'notebook'}
515 521 resp = self.api.save('foo/a.ipynb', body=json.dumps(nbmodel))
516 522
517 523 # List checkpoints
518 524 cps = self.api.get_checkpoints('foo/a.ipynb').json()
519 525 self.assertEqual(cps, [cp1])
520 526
521 527 nbcontent = self.api.read('foo/a.ipynb').json()['content']
522 528 nb = from_dict(nbcontent)
523 529 self.assertEqual(nb.cells[0].source, 'Created by test')
524 530
525 531 # Restore cp1
526 532 r = self.api.restore_checkpoint('foo/a.ipynb', cp1['id'])
527 533 self.assertEqual(r.status_code, 204)
528 534 nbcontent = self.api.read('foo/a.ipynb').json()['content']
529 535 nb = from_dict(nbcontent)
530 536 self.assertEqual(nb.cells, [])
531 537
532 538 # Delete cp1
533 539 r = self.api.delete_checkpoint('foo/a.ipynb', cp1['id'])
534 540 self.assertEqual(r.status_code, 204)
535 541 cps = self.api.get_checkpoints('foo/a.ipynb').json()
536 542 self.assertEqual(cps, [])
General Comments 0
You need to be logged in to leave comments. Login now