##// END OF EJS Templates
don't use nbformat.current in IPython.html...
MinRK -
Show More
@@ -1,148 +1,148
1 """Tornado handlers for nbconvert."""
1 """Tornado handlers for nbconvert."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import io
6 import io
7 import os
7 import os
8 import zipfile
8 import zipfile
9
9
10 from tornado import web
10 from tornado import web
11
11
12 from ..base.handlers import (
12 from ..base.handlers import (
13 IPythonHandler, FilesRedirectHandler,
13 IPythonHandler, FilesRedirectHandler,
14 notebook_path_regex, path_regex,
14 notebook_path_regex, path_regex,
15 )
15 )
16 from IPython.nbformat.current import from_dict
16 from IPython.nbformat import from_dict
17
17
18 from IPython.utils.py3compat import cast_bytes
18 from IPython.utils.py3compat import cast_bytes
19
19
20 def find_resource_files(output_files_dir):
20 def find_resource_files(output_files_dir):
21 files = []
21 files = []
22 for dirpath, dirnames, filenames in os.walk(output_files_dir):
22 for dirpath, dirnames, filenames in os.walk(output_files_dir):
23 files.extend([os.path.join(dirpath, f) for f in filenames])
23 files.extend([os.path.join(dirpath, f) for f in filenames])
24 return files
24 return files
25
25
26 def respond_zip(handler, name, output, resources):
26 def respond_zip(handler, name, output, resources):
27 """Zip up the output and resource files and respond with the zip file.
27 """Zip up the output and resource files and respond with the zip file.
28
28
29 Returns True if it has served a zip file, False if there are no resource
29 Returns True if it has served a zip file, False if there are no resource
30 files, in which case we serve the plain output file.
30 files, in which case we serve the plain output file.
31 """
31 """
32 # Check if we have resource files we need to zip
32 # Check if we have resource files we need to zip
33 output_files = resources.get('outputs', None)
33 output_files = resources.get('outputs', None)
34 if not output_files:
34 if not output_files:
35 return False
35 return False
36
36
37 # Headers
37 # Headers
38 zip_filename = os.path.splitext(name)[0] + '.zip'
38 zip_filename = os.path.splitext(name)[0] + '.zip'
39 handler.set_header('Content-Disposition',
39 handler.set_header('Content-Disposition',
40 'attachment; filename="%s"' % zip_filename)
40 'attachment; filename="%s"' % zip_filename)
41 handler.set_header('Content-Type', 'application/zip')
41 handler.set_header('Content-Type', 'application/zip')
42
42
43 # Prepare the zip file
43 # Prepare the zip file
44 buffer = io.BytesIO()
44 buffer = io.BytesIO()
45 zipf = zipfile.ZipFile(buffer, mode='w', compression=zipfile.ZIP_DEFLATED)
45 zipf = zipfile.ZipFile(buffer, mode='w', compression=zipfile.ZIP_DEFLATED)
46 output_filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
46 output_filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
47 zipf.writestr(output_filename, cast_bytes(output, 'utf-8'))
47 zipf.writestr(output_filename, cast_bytes(output, 'utf-8'))
48 for filename, data in output_files.items():
48 for filename, data in output_files.items():
49 zipf.writestr(os.path.basename(filename), data)
49 zipf.writestr(os.path.basename(filename), data)
50 zipf.close()
50 zipf.close()
51
51
52 handler.finish(buffer.getvalue())
52 handler.finish(buffer.getvalue())
53 return True
53 return True
54
54
55 def get_exporter(format, **kwargs):
55 def get_exporter(format, **kwargs):
56 """get an exporter, raising appropriate errors"""
56 """get an exporter, raising appropriate errors"""
57 # if this fails, will raise 500
57 # if this fails, will raise 500
58 try:
58 try:
59 from IPython.nbconvert.exporters.export import exporter_map
59 from IPython.nbconvert.exporters.export import exporter_map
60 except ImportError as e:
60 except ImportError as e:
61 raise web.HTTPError(500, "Could not import nbconvert: %s" % e)
61 raise web.HTTPError(500, "Could not import nbconvert: %s" % e)
62
62
63 try:
63 try:
64 Exporter = exporter_map[format]
64 Exporter = exporter_map[format]
65 except KeyError:
65 except KeyError:
66 # should this be 400?
66 # should this be 400?
67 raise web.HTTPError(404, u"No exporter for format: %s" % format)
67 raise web.HTTPError(404, u"No exporter for format: %s" % format)
68
68
69 try:
69 try:
70 return Exporter(**kwargs)
70 return Exporter(**kwargs)
71 except Exception as e:
71 except Exception as e:
72 raise web.HTTPError(500, "Could not construct Exporter: %s" % e)
72 raise web.HTTPError(500, "Could not construct Exporter: %s" % e)
73
73
74 class NbconvertFileHandler(IPythonHandler):
74 class NbconvertFileHandler(IPythonHandler):
75
75
76 SUPPORTED_METHODS = ('GET',)
76 SUPPORTED_METHODS = ('GET',)
77
77
78 @web.authenticated
78 @web.authenticated
79 def get(self, format, path='', name=None):
79 def get(self, format, path='', name=None):
80
80
81 exporter = get_exporter(format, config=self.config, log=self.log)
81 exporter = get_exporter(format, config=self.config, log=self.log)
82
82
83 path = path.strip('/')
83 path = path.strip('/')
84 model = self.contents_manager.get_model(name=name, path=path)
84 model = self.contents_manager.get_model(name=name, path=path)
85
85
86 self.set_header('Last-Modified', model['last_modified'])
86 self.set_header('Last-Modified', model['last_modified'])
87
87
88 try:
88 try:
89 output, resources = exporter.from_notebook_node(model['content'])
89 output, resources = exporter.from_notebook_node(model['content'])
90 except Exception as e:
90 except Exception as e:
91 raise web.HTTPError(500, "nbconvert failed: %s" % e)
91 raise web.HTTPError(500, "nbconvert failed: %s" % e)
92
92
93 if respond_zip(self, name, output, resources):
93 if respond_zip(self, name, output, resources):
94 return
94 return
95
95
96 # Force download if requested
96 # Force download if requested
97 if self.get_argument('download', 'false').lower() == 'true':
97 if self.get_argument('download', 'false').lower() == 'true':
98 filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
98 filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
99 self.set_header('Content-Disposition',
99 self.set_header('Content-Disposition',
100 'attachment; filename="%s"' % filename)
100 'attachment; filename="%s"' % filename)
101
101
102 # MIME type
102 # MIME type
103 if exporter.output_mimetype:
103 if exporter.output_mimetype:
104 self.set_header('Content-Type',
104 self.set_header('Content-Type',
105 '%s; charset=utf-8' % exporter.output_mimetype)
105 '%s; charset=utf-8' % exporter.output_mimetype)
106
106
107 self.finish(output)
107 self.finish(output)
108
108
109 class NbconvertPostHandler(IPythonHandler):
109 class NbconvertPostHandler(IPythonHandler):
110 SUPPORTED_METHODS = ('POST',)
110 SUPPORTED_METHODS = ('POST',)
111
111
112 @web.authenticated
112 @web.authenticated
113 def post(self, format):
113 def post(self, format):
114 exporter = get_exporter(format, config=self.config)
114 exporter = get_exporter(format, config=self.config)
115
115
116 model = self.get_json_body()
116 model = self.get_json_body()
117 name = model.get('name', 'notebook.ipynb')
117 name = model.get('name', 'notebook.ipynb')
118 nbnode = from_dict(model['content'])
118 nbnode = from_dict(model['content'])
119
119
120 try:
120 try:
121 output, resources = exporter.from_notebook_node(nbnode)
121 output, resources = exporter.from_notebook_node(nbnode)
122 except Exception as e:
122 except Exception as e:
123 raise web.HTTPError(500, "nbconvert failed: %s" % e)
123 raise web.HTTPError(500, "nbconvert failed: %s" % e)
124
124
125 if respond_zip(self, name, output, resources):
125 if respond_zip(self, name, output, resources):
126 return
126 return
127
127
128 # MIME type
128 # MIME type
129 if exporter.output_mimetype:
129 if exporter.output_mimetype:
130 self.set_header('Content-Type',
130 self.set_header('Content-Type',
131 '%s; charset=utf-8' % exporter.output_mimetype)
131 '%s; charset=utf-8' % exporter.output_mimetype)
132
132
133 self.finish(output)
133 self.finish(output)
134
134
135
135
136 #-----------------------------------------------------------------------------
136 #-----------------------------------------------------------------------------
137 # URL to handler mappings
137 # URL to handler mappings
138 #-----------------------------------------------------------------------------
138 #-----------------------------------------------------------------------------
139
139
140 _format_regex = r"(?P<format>\w+)"
140 _format_regex = r"(?P<format>\w+)"
141
141
142
142
143 default_handlers = [
143 default_handlers = [
144 (r"/nbconvert/%s%s" % (_format_regex, notebook_path_regex),
144 (r"/nbconvert/%s%s" % (_format_regex, notebook_path_regex),
145 NbconvertFileHandler),
145 NbconvertFileHandler),
146 (r"/nbconvert/%s" % _format_regex, NbconvertPostHandler),
146 (r"/nbconvert/%s" % _format_regex, NbconvertPostHandler),
147 (r"/nbconvert/html%s" % path_regex, FilesRedirectHandler),
147 (r"/nbconvert/html%s" % path_regex, FilesRedirectHandler),
148 ]
148 ]
@@ -1,131 +1,132
1 # coding: utf-8
1 # coding: utf-8
2 import base64
2 import base64
3 import io
3 import io
4 import json
4 import json
5 import os
5 import os
6 from os.path import join as pjoin
6 from os.path import join as pjoin
7 import shutil
7 import shutil
8
8
9 import requests
9 import requests
10
10
11 from IPython.html.utils import url_path_join
11 from IPython.html.utils import url_path_join
12 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
12 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
13 from IPython.nbformat.current import (new_notebook, write,
13 from IPython.nbformat import write
14 new_markdown_cell, new_code_cell,
14 from IPython.nbformat.v4 import (
15 new_output)
15 new_notebook, new_markdown_cell, new_code_cell, new_output,
16 )
16
17
17 from IPython.testing.decorators import onlyif_cmds_exist
18 from IPython.testing.decorators import onlyif_cmds_exist
18
19
19
20
20 class NbconvertAPI(object):
21 class NbconvertAPI(object):
21 """Wrapper for nbconvert API calls."""
22 """Wrapper for nbconvert API calls."""
22 def __init__(self, base_url):
23 def __init__(self, base_url):
23 self.base_url = base_url
24 self.base_url = base_url
24
25
25 def _req(self, verb, path, body=None, params=None):
26 def _req(self, verb, path, body=None, params=None):
26 response = requests.request(verb,
27 response = requests.request(verb,
27 url_path_join(self.base_url, 'nbconvert', path),
28 url_path_join(self.base_url, 'nbconvert', path),
28 data=body, params=params,
29 data=body, params=params,
29 )
30 )
30 response.raise_for_status()
31 response.raise_for_status()
31 return response
32 return response
32
33
33 def from_file(self, format, path, name, download=False):
34 def from_file(self, format, path, name, download=False):
34 return self._req('GET', url_path_join(format, path, name),
35 return self._req('GET', url_path_join(format, path, name),
35 params={'download':download})
36 params={'download':download})
36
37
37 def from_post(self, format, nbmodel):
38 def from_post(self, format, nbmodel):
38 body = json.dumps(nbmodel)
39 body = json.dumps(nbmodel)
39 return self._req('POST', format, body)
40 return self._req('POST', format, body)
40
41
41 def list_formats(self):
42 def list_formats(self):
42 return self._req('GET', '')
43 return self._req('GET', '')
43
44
44 png_green_pixel = base64.encodestring(b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00'
45 png_green_pixel = base64.encodestring(b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00'
45 b'\x00\x00\x01\x00\x00x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\x0cIDAT'
46 b'\x00\x00\x01\x00\x00x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\x0cIDAT'
46 b'\x08\xd7c\x90\xfb\xcf\x00\x00\x02\\\x01\x1e.~d\x87\x00\x00\x00\x00IEND\xaeB`\x82'
47 b'\x08\xd7c\x90\xfb\xcf\x00\x00\x02\\\x01\x1e.~d\x87\x00\x00\x00\x00IEND\xaeB`\x82'
47 ).decode('ascii')
48 ).decode('ascii')
48
49
49 class APITest(NotebookTestBase):
50 class APITest(NotebookTestBase):
50 def setUp(self):
51 def setUp(self):
51 nbdir = self.notebook_dir.name
52 nbdir = self.notebook_dir.name
52
53
53 if not os.path.isdir(pjoin(nbdir, 'foo')):
54 if not os.path.isdir(pjoin(nbdir, 'foo')):
54 os.mkdir(pjoin(nbdir, 'foo'))
55 os.mkdir(pjoin(nbdir, 'foo'))
55
56
56 nb = new_notebook()
57 nb = new_notebook()
57
58
58 nb.cells.append(new_markdown_cell(u'Created by test ³'))
59 nb.cells.append(new_markdown_cell(u'Created by test ³'))
59 cc1 = new_code_cell(source=u'print(2*6)')
60 cc1 = new_code_cell(source=u'print(2*6)')
60 cc1.outputs.append(new_output(output_type="stream", text=u'12'))
61 cc1.outputs.append(new_output(output_type="stream", text=u'12'))
61 cc1.outputs.append(new_output(output_type="execute_result",
62 cc1.outputs.append(new_output(output_type="execute_result",
62 data={'image/png' : png_green_pixel},
63 data={'image/png' : png_green_pixel},
63 execution_count=1,
64 execution_count=1,
64 ))
65 ))
65 nb.cells.append(cc1)
66 nb.cells.append(cc1)
66
67
67 with io.open(pjoin(nbdir, 'foo', 'testnb.ipynb'), 'w',
68 with io.open(pjoin(nbdir, 'foo', 'testnb.ipynb'), 'w',
68 encoding='utf-8') as f:
69 encoding='utf-8') as f:
69 write(nb, f)
70 write(f, nb, version=4)
70
71
71 self.nbconvert_api = NbconvertAPI(self.base_url())
72 self.nbconvert_api = NbconvertAPI(self.base_url())
72
73
73 def tearDown(self):
74 def tearDown(self):
74 nbdir = self.notebook_dir.name
75 nbdir = self.notebook_dir.name
75
76
76 for dname in ['foo']:
77 for dname in ['foo']:
77 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
78 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
78
79
79 @onlyif_cmds_exist('pandoc')
80 @onlyif_cmds_exist('pandoc')
80 def test_from_file(self):
81 def test_from_file(self):
81 r = self.nbconvert_api.from_file('html', 'foo', 'testnb.ipynb')
82 r = self.nbconvert_api.from_file('html', 'foo', 'testnb.ipynb')
82 self.assertEqual(r.status_code, 200)
83 self.assertEqual(r.status_code, 200)
83 self.assertIn(u'text/html', r.headers['Content-Type'])
84 self.assertIn(u'text/html', r.headers['Content-Type'])
84 self.assertIn(u'Created by test', r.text)
85 self.assertIn(u'Created by test', r.text)
85 self.assertIn(u'print', r.text)
86 self.assertIn(u'print', r.text)
86
87
87 r = self.nbconvert_api.from_file('python', 'foo', 'testnb.ipynb')
88 r = self.nbconvert_api.from_file('python', 'foo', 'testnb.ipynb')
88 self.assertIn(u'text/x-python', r.headers['Content-Type'])
89 self.assertIn(u'text/x-python', r.headers['Content-Type'])
89 self.assertIn(u'print(2*6)', r.text)
90 self.assertIn(u'print(2*6)', r.text)
90
91
91 @onlyif_cmds_exist('pandoc')
92 @onlyif_cmds_exist('pandoc')
92 def test_from_file_404(self):
93 def test_from_file_404(self):
93 with assert_http_error(404):
94 with assert_http_error(404):
94 self.nbconvert_api.from_file('html', 'foo', 'thisdoesntexist.ipynb')
95 self.nbconvert_api.from_file('html', 'foo', 'thisdoesntexist.ipynb')
95
96
96 @onlyif_cmds_exist('pandoc')
97 @onlyif_cmds_exist('pandoc')
97 def test_from_file_download(self):
98 def test_from_file_download(self):
98 r = self.nbconvert_api.from_file('python', 'foo', 'testnb.ipynb', download=True)
99 r = self.nbconvert_api.from_file('python', 'foo', 'testnb.ipynb', download=True)
99 content_disposition = r.headers['Content-Disposition']
100 content_disposition = r.headers['Content-Disposition']
100 self.assertIn('attachment', content_disposition)
101 self.assertIn('attachment', content_disposition)
101 self.assertIn('testnb.py', content_disposition)
102 self.assertIn('testnb.py', content_disposition)
102
103
103 @onlyif_cmds_exist('pandoc')
104 @onlyif_cmds_exist('pandoc')
104 def test_from_file_zip(self):
105 def test_from_file_zip(self):
105 r = self.nbconvert_api.from_file('latex', 'foo', 'testnb.ipynb', download=True)
106 r = self.nbconvert_api.from_file('latex', 'foo', 'testnb.ipynb', download=True)
106 self.assertIn(u'application/zip', r.headers['Content-Type'])
107 self.assertIn(u'application/zip', r.headers['Content-Type'])
107 self.assertIn(u'.zip', r.headers['Content-Disposition'])
108 self.assertIn(u'.zip', r.headers['Content-Disposition'])
108
109
109 @onlyif_cmds_exist('pandoc')
110 @onlyif_cmds_exist('pandoc')
110 def test_from_post(self):
111 def test_from_post(self):
111 nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
112 nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
112 nbmodel = requests.get(nbmodel_url).json()
113 nbmodel = requests.get(nbmodel_url).json()
113
114
114 r = self.nbconvert_api.from_post(format='html', nbmodel=nbmodel)
115 r = self.nbconvert_api.from_post(format='html', nbmodel=nbmodel)
115 self.assertEqual(r.status_code, 200)
116 self.assertEqual(r.status_code, 200)
116 self.assertIn(u'text/html', r.headers['Content-Type'])
117 self.assertIn(u'text/html', r.headers['Content-Type'])
117 self.assertIn(u'Created by test', r.text)
118 self.assertIn(u'Created by test', r.text)
118 self.assertIn(u'print', r.text)
119 self.assertIn(u'print', r.text)
119
120
120 r = self.nbconvert_api.from_post(format='python', nbmodel=nbmodel)
121 r = self.nbconvert_api.from_post(format='python', nbmodel=nbmodel)
121 self.assertIn(u'text/x-python', r.headers['Content-Type'])
122 self.assertIn(u'text/x-python', r.headers['Content-Type'])
122 self.assertIn(u'print(2*6)', r.text)
123 self.assertIn(u'print(2*6)', r.text)
123
124
124 @onlyif_cmds_exist('pandoc')
125 @onlyif_cmds_exist('pandoc')
125 def test_from_post_zip(self):
126 def test_from_post_zip(self):
126 nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
127 nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
127 nbmodel = requests.get(nbmodel_url).json()
128 nbmodel = requests.get(nbmodel_url).json()
128
129
129 r = self.nbconvert_api.from_post(format='latex', nbmodel=nbmodel)
130 r = self.nbconvert_api.from_post(format='latex', nbmodel=nbmodel)
130 self.assertIn(u'application/zip', r.headers['Content-Type'])
131 self.assertIn(u'application/zip', r.headers['Content-Type'])
131 self.assertIn(u'.zip', r.headers['Content-Disposition'])
132 self.assertIn(u'.zip', r.headers['Content-Disposition'])
@@ -1,545 +1,545
1 """A contents manager that uses the local file system for storage."""
1 """A contents manager that uses the local file system for storage."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6 import base64
7 import io
7 import io
8 import os
8 import os
9 import glob
9 import glob
10 import shutil
10 import shutil
11
11
12 from tornado import web
12 from tornado import web
13
13
14 from .manager import ContentsManager
14 from .manager import ContentsManager
15 from IPython.nbformat import current
15 from IPython import nbformat
16 from IPython.utils.io import atomic_writing
16 from IPython.utils.io import atomic_writing
17 from IPython.utils.path import ensure_dir_exists
17 from IPython.utils.path import ensure_dir_exists
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
19 from IPython.utils.py3compat import getcwd
19 from IPython.utils.py3compat import getcwd
20 from IPython.utils import tz
20 from IPython.utils import tz
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
22
22
23
23
24 class FileContentsManager(ContentsManager):
24 class FileContentsManager(ContentsManager):
25
25
26 root_dir = Unicode(getcwd(), config=True)
26 root_dir = Unicode(getcwd(), config=True)
27
27
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
29 def _save_script_changed(self):
29 def _save_script_changed(self):
30 self.log.warn("""
30 self.log.warn("""
31 Automatically saving notebooks as scripts has been removed.
31 Automatically saving notebooks as scripts has been removed.
32 Use `ipython nbconvert --to python [notebook]` instead.
32 Use `ipython nbconvert --to python [notebook]` instead.
33 """)
33 """)
34
34
35 def _root_dir_changed(self, name, old, new):
35 def _root_dir_changed(self, name, old, new):
36 """Do a bit of validation of the root_dir."""
36 """Do a bit of validation of the root_dir."""
37 if not os.path.isabs(new):
37 if not os.path.isabs(new):
38 # If we receive a non-absolute path, make it absolute.
38 # If we receive a non-absolute path, make it absolute.
39 self.root_dir = os.path.abspath(new)
39 self.root_dir = os.path.abspath(new)
40 return
40 return
41 if not os.path.isdir(new):
41 if not os.path.isdir(new):
42 raise TraitError("%r is not a directory" % new)
42 raise TraitError("%r is not a directory" % new)
43
43
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
45 help="""The directory name in which to keep file checkpoints
45 help="""The directory name in which to keep file checkpoints
46
46
47 This is a path relative to the file's own directory.
47 This is a path relative to the file's own directory.
48
48
49 By default, it is .ipynb_checkpoints
49 By default, it is .ipynb_checkpoints
50 """
50 """
51 )
51 )
52
52
53 def _copy(self, src, dest):
53 def _copy(self, src, dest):
54 """copy src to dest
54 """copy src to dest
55
55
56 like shutil.copy2, but log errors in copystat
56 like shutil.copy2, but log errors in copystat
57 """
57 """
58 shutil.copyfile(src, dest)
58 shutil.copyfile(src, dest)
59 try:
59 try:
60 shutil.copystat(src, dest)
60 shutil.copystat(src, dest)
61 except OSError as e:
61 except OSError as e:
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
63
63
64 def _get_os_path(self, name=None, path=''):
64 def _get_os_path(self, name=None, path=''):
65 """Given a filename and API path, return its file system
65 """Given a filename and API path, return its file system
66 path.
66 path.
67
67
68 Parameters
68 Parameters
69 ----------
69 ----------
70 name : string
70 name : string
71 A filename
71 A filename
72 path : string
72 path : string
73 The relative API path to the named file.
73 The relative API path to the named file.
74
74
75 Returns
75 Returns
76 -------
76 -------
77 path : string
77 path : string
78 API path to be evaluated relative to root_dir.
78 API path to be evaluated relative to root_dir.
79 """
79 """
80 if name is not None:
80 if name is not None:
81 path = url_path_join(path, name)
81 path = url_path_join(path, name)
82 return to_os_path(path, self.root_dir)
82 return to_os_path(path, self.root_dir)
83
83
84 def path_exists(self, path):
84 def path_exists(self, path):
85 """Does the API-style path refer to an extant directory?
85 """Does the API-style path refer to an extant directory?
86
86
87 API-style wrapper for os.path.isdir
87 API-style wrapper for os.path.isdir
88
88
89 Parameters
89 Parameters
90 ----------
90 ----------
91 path : string
91 path : string
92 The path to check. This is an API path (`/` separated,
92 The path to check. This is an API path (`/` separated,
93 relative to root_dir).
93 relative to root_dir).
94
94
95 Returns
95 Returns
96 -------
96 -------
97 exists : bool
97 exists : bool
98 Whether the path is indeed a directory.
98 Whether the path is indeed a directory.
99 """
99 """
100 path = path.strip('/')
100 path = path.strip('/')
101 os_path = self._get_os_path(path=path)
101 os_path = self._get_os_path(path=path)
102 return os.path.isdir(os_path)
102 return os.path.isdir(os_path)
103
103
104 def is_hidden(self, path):
104 def is_hidden(self, path):
105 """Does the API style path correspond to a hidden directory or file?
105 """Does the API style path correspond to a hidden directory or file?
106
106
107 Parameters
107 Parameters
108 ----------
108 ----------
109 path : string
109 path : string
110 The path to check. This is an API path (`/` separated,
110 The path to check. This is an API path (`/` separated,
111 relative to root_dir).
111 relative to root_dir).
112
112
113 Returns
113 Returns
114 -------
114 -------
115 exists : bool
115 exists : bool
116 Whether the path is hidden.
116 Whether the path is hidden.
117
117
118 """
118 """
119 path = path.strip('/')
119 path = path.strip('/')
120 os_path = self._get_os_path(path=path)
120 os_path = self._get_os_path(path=path)
121 return is_hidden(os_path, self.root_dir)
121 return is_hidden(os_path, self.root_dir)
122
122
123 def file_exists(self, name, path=''):
123 def file_exists(self, name, path=''):
124 """Returns True if the file exists, else returns False.
124 """Returns True if the file exists, else returns False.
125
125
126 API-style wrapper for os.path.isfile
126 API-style wrapper for os.path.isfile
127
127
128 Parameters
128 Parameters
129 ----------
129 ----------
130 name : string
130 name : string
131 The name of the file you are checking.
131 The name of the file you are checking.
132 path : string
132 path : string
133 The relative path to the file's directory (with '/' as separator)
133 The relative path to the file's directory (with '/' as separator)
134
134
135 Returns
135 Returns
136 -------
136 -------
137 exists : bool
137 exists : bool
138 Whether the file exists.
138 Whether the file exists.
139 """
139 """
140 path = path.strip('/')
140 path = path.strip('/')
141 nbpath = self._get_os_path(name, path=path)
141 nbpath = self._get_os_path(name, path=path)
142 return os.path.isfile(nbpath)
142 return os.path.isfile(nbpath)
143
143
144 def exists(self, name=None, path=''):
144 def exists(self, name=None, path=''):
145 """Returns True if the path [and name] exists, else returns False.
145 """Returns True if the path [and name] exists, else returns False.
146
146
147 API-style wrapper for os.path.exists
147 API-style wrapper for os.path.exists
148
148
149 Parameters
149 Parameters
150 ----------
150 ----------
151 name : string
151 name : string
152 The name of the file you are checking.
152 The name of the file you are checking.
153 path : string
153 path : string
154 The relative path to the file's directory (with '/' as separator)
154 The relative path to the file's directory (with '/' as separator)
155
155
156 Returns
156 Returns
157 -------
157 -------
158 exists : bool
158 exists : bool
159 Whether the target exists.
159 Whether the target exists.
160 """
160 """
161 path = path.strip('/')
161 path = path.strip('/')
162 os_path = self._get_os_path(name, path=path)
162 os_path = self._get_os_path(name, path=path)
163 return os.path.exists(os_path)
163 return os.path.exists(os_path)
164
164
165 def _base_model(self, name, path=''):
165 def _base_model(self, name, path=''):
166 """Build the common base of a contents model"""
166 """Build the common base of a contents model"""
167 os_path = self._get_os_path(name, path)
167 os_path = self._get_os_path(name, path)
168 info = os.stat(os_path)
168 info = os.stat(os_path)
169 last_modified = tz.utcfromtimestamp(info.st_mtime)
169 last_modified = tz.utcfromtimestamp(info.st_mtime)
170 created = tz.utcfromtimestamp(info.st_ctime)
170 created = tz.utcfromtimestamp(info.st_ctime)
171 # Create the base model.
171 # Create the base model.
172 model = {}
172 model = {}
173 model['name'] = name
173 model['name'] = name
174 model['path'] = path
174 model['path'] = path
175 model['last_modified'] = last_modified
175 model['last_modified'] = last_modified
176 model['created'] = created
176 model['created'] = created
177 model['content'] = None
177 model['content'] = None
178 model['format'] = None
178 model['format'] = None
179 model['message'] = None
179 model['message'] = None
180 return model
180 return model
181
181
182 def _dir_model(self, name, path='', content=True):
182 def _dir_model(self, name, path='', content=True):
183 """Build a model for a directory
183 """Build a model for a directory
184
184
185 if content is requested, will include a listing of the directory
185 if content is requested, will include a listing of the directory
186 """
186 """
187 os_path = self._get_os_path(name, path)
187 os_path = self._get_os_path(name, path)
188
188
189 four_o_four = u'directory does not exist: %r' % os_path
189 four_o_four = u'directory does not exist: %r' % os_path
190
190
191 if not os.path.isdir(os_path):
191 if not os.path.isdir(os_path):
192 raise web.HTTPError(404, four_o_four)
192 raise web.HTTPError(404, four_o_four)
193 elif is_hidden(os_path, self.root_dir):
193 elif is_hidden(os_path, self.root_dir):
194 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
194 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
195 os_path
195 os_path
196 )
196 )
197 raise web.HTTPError(404, four_o_four)
197 raise web.HTTPError(404, four_o_four)
198
198
199 if name is None:
199 if name is None:
200 if '/' in path:
200 if '/' in path:
201 path, name = path.rsplit('/', 1)
201 path, name = path.rsplit('/', 1)
202 else:
202 else:
203 name = ''
203 name = ''
204 model = self._base_model(name, path)
204 model = self._base_model(name, path)
205 model['type'] = 'directory'
205 model['type'] = 'directory'
206 dir_path = u'{}/{}'.format(path, name)
206 dir_path = u'{}/{}'.format(path, name)
207 if content:
207 if content:
208 model['content'] = contents = []
208 model['content'] = contents = []
209 for os_path in glob.glob(self._get_os_path('*', dir_path)):
209 for os_path in glob.glob(self._get_os_path('*', dir_path)):
210 name = os.path.basename(os_path)
210 name = os.path.basename(os_path)
211 # skip over broken symlinks in listing
211 # skip over broken symlinks in listing
212 if not os.path.exists(os_path):
212 if not os.path.exists(os_path):
213 self.log.warn("%s doesn't exist", os_path)
213 self.log.warn("%s doesn't exist", os_path)
214 continue
214 continue
215 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
215 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
216 contents.append(self.get_model(name=name, path=dir_path, content=False))
216 contents.append(self.get_model(name=name, path=dir_path, content=False))
217
217
218 model['format'] = 'json'
218 model['format'] = 'json'
219
219
220 return model
220 return model
221
221
222 def _file_model(self, name, path='', content=True):
222 def _file_model(self, name, path='', content=True):
223 """Build a model for a file
223 """Build a model for a file
224
224
225 if content is requested, include the file contents.
225 if content is requested, include the file contents.
226 UTF-8 text files will be unicode, binary files will be base64-encoded.
226 UTF-8 text files will be unicode, binary files will be base64-encoded.
227 """
227 """
228 model = self._base_model(name, path)
228 model = self._base_model(name, path)
229 model['type'] = 'file'
229 model['type'] = 'file'
230 if content:
230 if content:
231 os_path = self._get_os_path(name, path)
231 os_path = self._get_os_path(name, path)
232 with io.open(os_path, 'rb') as f:
232 with io.open(os_path, 'rb') as f:
233 bcontent = f.read()
233 bcontent = f.read()
234 try:
234 try:
235 model['content'] = bcontent.decode('utf8')
235 model['content'] = bcontent.decode('utf8')
236 except UnicodeError as e:
236 except UnicodeError as e:
237 model['content'] = base64.encodestring(bcontent).decode('ascii')
237 model['content'] = base64.encodestring(bcontent).decode('ascii')
238 model['format'] = 'base64'
238 model['format'] = 'base64'
239 else:
239 else:
240 model['format'] = 'text'
240 model['format'] = 'text'
241 return model
241 return model
242
242
243
243
244 def _notebook_model(self, name, path='', content=True):
244 def _notebook_model(self, name, path='', content=True):
245 """Build a notebook model
245 """Build a notebook model
246
246
247 if content is requested, the notebook content will be populated
247 if content is requested, the notebook content will be populated
248 as a JSON structure (not double-serialized)
248 as a JSON structure (not double-serialized)
249 """
249 """
250 model = self._base_model(name, path)
250 model = self._base_model(name, path)
251 model['type'] = 'notebook'
251 model['type'] = 'notebook'
252 if content:
252 if content:
253 os_path = self._get_os_path(name, path)
253 os_path = self._get_os_path(name, path)
254 with io.open(os_path, 'r', encoding='utf-8') as f:
254 with io.open(os_path, 'r', encoding='utf-8') as f:
255 try:
255 try:
256 nb = current.read(f, u'json')
256 nb = nbformat.read(f, as_version=4)
257 except Exception as e:
257 except Exception as e:
258 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
258 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
259 self.mark_trusted_cells(nb, name, path)
259 self.mark_trusted_cells(nb, name, path)
260 model['content'] = nb
260 model['content'] = nb
261 model['format'] = 'json'
261 model['format'] = 'json'
262 self.validate_notebook_model(model)
262 self.validate_notebook_model(model)
263 return model
263 return model
264
264
265 def get_model(self, name, path='', content=True):
265 def get_model(self, name, path='', content=True):
266 """ Takes a path and name for an entity and returns its model
266 """ Takes a path and name for an entity and returns its model
267
267
268 Parameters
268 Parameters
269 ----------
269 ----------
270 name : str
270 name : str
271 the name of the target
271 the name of the target
272 path : str
272 path : str
273 the API path that describes the relative path for the target
273 the API path that describes the relative path for the target
274
274
275 Returns
275 Returns
276 -------
276 -------
277 model : dict
277 model : dict
278 the contents model. If content=True, returns the contents
278 the contents model. If content=True, returns the contents
279 of the file or directory as well.
279 of the file or directory as well.
280 """
280 """
281 path = path.strip('/')
281 path = path.strip('/')
282
282
283 if not self.exists(name=name, path=path):
283 if not self.exists(name=name, path=path):
284 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
284 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
285
285
286 os_path = self._get_os_path(name, path)
286 os_path = self._get_os_path(name, path)
287 if os.path.isdir(os_path):
287 if os.path.isdir(os_path):
288 model = self._dir_model(name, path, content)
288 model = self._dir_model(name, path, content)
289 elif name.endswith('.ipynb'):
289 elif name.endswith('.ipynb'):
290 model = self._notebook_model(name, path, content)
290 model = self._notebook_model(name, path, content)
291 else:
291 else:
292 model = self._file_model(name, path, content)
292 model = self._file_model(name, path, content)
293 return model
293 return model
294
294
295 def _save_notebook(self, os_path, model, name='', path=''):
295 def _save_notebook(self, os_path, model, name='', path=''):
296 """save a notebook file"""
296 """save a notebook file"""
297 # Save the notebook file
297 # Save the notebook file
298 nb = current.from_dict(model['content'])
298 nb = nbformat.from_dict(model['content'])
299
299
300 self.check_and_sign(nb, name, path)
300 self.check_and_sign(nb, name, path)
301
301
302 if 'name' in nb['metadata']:
302 if 'name' in nb['metadata']:
303 nb['metadata']['name'] = u''
303 nb['metadata']['name'] = u''
304
304
305 with atomic_writing(os_path, encoding='utf-8') as f:
305 with atomic_writing(os_path, encoding='utf-8') as f:
306 current.write(nb, f, version=nb.nbformat)
306 nbformat.write(f, nb, version=nbformat.NO_CONVERT)
307
307
308 def _save_file(self, os_path, model, name='', path=''):
308 def _save_file(self, os_path, model, name='', path=''):
309 """save a non-notebook file"""
309 """save a non-notebook file"""
310 fmt = model.get('format', None)
310 fmt = model.get('format', None)
311 if fmt not in {'text', 'base64'}:
311 if fmt not in {'text', 'base64'}:
312 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
312 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
313 try:
313 try:
314 content = model['content']
314 content = model['content']
315 if fmt == 'text':
315 if fmt == 'text':
316 bcontent = content.encode('utf8')
316 bcontent = content.encode('utf8')
317 else:
317 else:
318 b64_bytes = content.encode('ascii')
318 b64_bytes = content.encode('ascii')
319 bcontent = base64.decodestring(b64_bytes)
319 bcontent = base64.decodestring(b64_bytes)
320 except Exception as e:
320 except Exception as e:
321 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
321 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
322 with atomic_writing(os_path, text=False) as f:
322 with atomic_writing(os_path, text=False) as f:
323 f.write(bcontent)
323 f.write(bcontent)
324
324
325 def _save_directory(self, os_path, model, name='', path=''):
325 def _save_directory(self, os_path, model, name='', path=''):
326 """create a directory"""
326 """create a directory"""
327 if is_hidden(os_path, self.root_dir):
327 if is_hidden(os_path, self.root_dir):
328 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
328 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
329 if not os.path.exists(os_path):
329 if not os.path.exists(os_path):
330 os.mkdir(os_path)
330 os.mkdir(os_path)
331 elif not os.path.isdir(os_path):
331 elif not os.path.isdir(os_path):
332 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
332 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
333 else:
333 else:
334 self.log.debug("Directory %r already exists", os_path)
334 self.log.debug("Directory %r already exists", os_path)
335
335
336 def save(self, model, name='', path=''):
336 def save(self, model, name='', path=''):
337 """Save the file model and return the model with no content."""
337 """Save the file model and return the model with no content."""
338 path = path.strip('/')
338 path = path.strip('/')
339
339
340 if 'type' not in model:
340 if 'type' not in model:
341 raise web.HTTPError(400, u'No file type provided')
341 raise web.HTTPError(400, u'No file type provided')
342 if 'content' not in model and model['type'] != 'directory':
342 if 'content' not in model and model['type'] != 'directory':
343 raise web.HTTPError(400, u'No file content provided')
343 raise web.HTTPError(400, u'No file content provided')
344
344
345 # One checkpoint should always exist
345 # One checkpoint should always exist
346 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
346 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
347 self.create_checkpoint(name, path)
347 self.create_checkpoint(name, path)
348
348
349 new_path = model.get('path', path).strip('/')
349 new_path = model.get('path', path).strip('/')
350 new_name = model.get('name', name)
350 new_name = model.get('name', name)
351
351
352 if path != new_path or name != new_name:
352 if path != new_path or name != new_name:
353 self.rename(name, path, new_name, new_path)
353 self.rename(name, path, new_name, new_path)
354
354
355 os_path = self._get_os_path(new_name, new_path)
355 os_path = self._get_os_path(new_name, new_path)
356 self.log.debug("Saving %s", os_path)
356 self.log.debug("Saving %s", os_path)
357 try:
357 try:
358 if model['type'] == 'notebook':
358 if model['type'] == 'notebook':
359 self._save_notebook(os_path, model, new_name, new_path)
359 self._save_notebook(os_path, model, new_name, new_path)
360 elif model['type'] == 'file':
360 elif model['type'] == 'file':
361 self._save_file(os_path, model, new_name, new_path)
361 self._save_file(os_path, model, new_name, new_path)
362 elif model['type'] == 'directory':
362 elif model['type'] == 'directory':
363 self._save_directory(os_path, model, new_name, new_path)
363 self._save_directory(os_path, model, new_name, new_path)
364 else:
364 else:
365 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
365 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
366 except web.HTTPError:
366 except web.HTTPError:
367 raise
367 raise
368 except Exception as e:
368 except Exception as e:
369 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
369 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
370
370
371 validation_message = None
371 validation_message = None
372 if model['type'] == 'notebook':
372 if model['type'] == 'notebook':
373 self.validate_notebook_model(model)
373 self.validate_notebook_model(model)
374 validation_message = model.get('message', None)
374 validation_message = model.get('message', None)
375
375
376 model = self.get_model(new_name, new_path, content=False)
376 model = self.get_model(new_name, new_path, content=False)
377 if validation_message:
377 if validation_message:
378 model['message'] = validation_message
378 model['message'] = validation_message
379 return model
379 return model
380
380
381 def update(self, model, name, path=''):
381 def update(self, model, name, path=''):
382 """Update the file's path and/or name
382 """Update the file's path and/or name
383
383
384 For use in PATCH requests, to enable renaming a file without
384 For use in PATCH requests, to enable renaming a file without
385 re-uploading its contents. Only used for renaming at the moment.
385 re-uploading its contents. Only used for renaming at the moment.
386 """
386 """
387 path = path.strip('/')
387 path = path.strip('/')
388 new_name = model.get('name', name)
388 new_name = model.get('name', name)
389 new_path = model.get('path', path).strip('/')
389 new_path = model.get('path', path).strip('/')
390 if path != new_path or name != new_name:
390 if path != new_path or name != new_name:
391 self.rename(name, path, new_name, new_path)
391 self.rename(name, path, new_name, new_path)
392 model = self.get_model(new_name, new_path, content=False)
392 model = self.get_model(new_name, new_path, content=False)
393 return model
393 return model
394
394
395 def delete(self, name, path=''):
395 def delete(self, name, path=''):
396 """Delete file by name and path."""
396 """Delete file by name and path."""
397 path = path.strip('/')
397 path = path.strip('/')
398 os_path = self._get_os_path(name, path)
398 os_path = self._get_os_path(name, path)
399 rm = os.unlink
399 rm = os.unlink
400 if os.path.isdir(os_path):
400 if os.path.isdir(os_path):
401 listing = os.listdir(os_path)
401 listing = os.listdir(os_path)
402 # don't delete non-empty directories (checkpoints dir doesn't count)
402 # don't delete non-empty directories (checkpoints dir doesn't count)
403 if listing and listing != [self.checkpoint_dir]:
403 if listing and listing != [self.checkpoint_dir]:
404 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
404 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
405 elif not os.path.isfile(os_path):
405 elif not os.path.isfile(os_path):
406 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
406 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
407
407
408 # clear checkpoints
408 # clear checkpoints
409 for checkpoint in self.list_checkpoints(name, path):
409 for checkpoint in self.list_checkpoints(name, path):
410 checkpoint_id = checkpoint['id']
410 checkpoint_id = checkpoint['id']
411 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
411 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
412 if os.path.isfile(cp_path):
412 if os.path.isfile(cp_path):
413 self.log.debug("Unlinking checkpoint %s", cp_path)
413 self.log.debug("Unlinking checkpoint %s", cp_path)
414 os.unlink(cp_path)
414 os.unlink(cp_path)
415
415
416 if os.path.isdir(os_path):
416 if os.path.isdir(os_path):
417 self.log.debug("Removing directory %s", os_path)
417 self.log.debug("Removing directory %s", os_path)
418 shutil.rmtree(os_path)
418 shutil.rmtree(os_path)
419 else:
419 else:
420 self.log.debug("Unlinking file %s", os_path)
420 self.log.debug("Unlinking file %s", os_path)
421 rm(os_path)
421 rm(os_path)
422
422
423 def rename(self, old_name, old_path, new_name, new_path):
423 def rename(self, old_name, old_path, new_name, new_path):
424 """Rename a file."""
424 """Rename a file."""
425 old_path = old_path.strip('/')
425 old_path = old_path.strip('/')
426 new_path = new_path.strip('/')
426 new_path = new_path.strip('/')
427 if new_name == old_name and new_path == old_path:
427 if new_name == old_name and new_path == old_path:
428 return
428 return
429
429
430 new_os_path = self._get_os_path(new_name, new_path)
430 new_os_path = self._get_os_path(new_name, new_path)
431 old_os_path = self._get_os_path(old_name, old_path)
431 old_os_path = self._get_os_path(old_name, old_path)
432
432
433 # Should we proceed with the move?
433 # Should we proceed with the move?
434 if os.path.isfile(new_os_path):
434 if os.path.isfile(new_os_path):
435 raise web.HTTPError(409, u'File with name already exists: %s' % new_os_path)
435 raise web.HTTPError(409, u'File with name already exists: %s' % new_os_path)
436
436
437 # Move the file
437 # Move the file
438 try:
438 try:
439 shutil.move(old_os_path, new_os_path)
439 shutil.move(old_os_path, new_os_path)
440 except Exception as e:
440 except Exception as e:
441 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
441 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
442
442
443 # Move the checkpoints
443 # Move the checkpoints
444 old_checkpoints = self.list_checkpoints(old_name, old_path)
444 old_checkpoints = self.list_checkpoints(old_name, old_path)
445 for cp in old_checkpoints:
445 for cp in old_checkpoints:
446 checkpoint_id = cp['id']
446 checkpoint_id = cp['id']
447 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
447 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
448 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
448 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
449 if os.path.isfile(old_cp_path):
449 if os.path.isfile(old_cp_path):
450 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
450 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
451 shutil.move(old_cp_path, new_cp_path)
451 shutil.move(old_cp_path, new_cp_path)
452
452
453 # Checkpoint-related utilities
453 # Checkpoint-related utilities
454
454
455 def get_checkpoint_path(self, checkpoint_id, name, path=''):
455 def get_checkpoint_path(self, checkpoint_id, name, path=''):
456 """find the path to a checkpoint"""
456 """find the path to a checkpoint"""
457 path = path.strip('/')
457 path = path.strip('/')
458 basename, ext = os.path.splitext(name)
458 basename, ext = os.path.splitext(name)
459 filename = u"{name}-{checkpoint_id}{ext}".format(
459 filename = u"{name}-{checkpoint_id}{ext}".format(
460 name=basename,
460 name=basename,
461 checkpoint_id=checkpoint_id,
461 checkpoint_id=checkpoint_id,
462 ext=ext,
462 ext=ext,
463 )
463 )
464 os_path = self._get_os_path(path=path)
464 os_path = self._get_os_path(path=path)
465 cp_dir = os.path.join(os_path, self.checkpoint_dir)
465 cp_dir = os.path.join(os_path, self.checkpoint_dir)
466 ensure_dir_exists(cp_dir)
466 ensure_dir_exists(cp_dir)
467 cp_path = os.path.join(cp_dir, filename)
467 cp_path = os.path.join(cp_dir, filename)
468 return cp_path
468 return cp_path
469
469
470 def get_checkpoint_model(self, checkpoint_id, name, path=''):
470 def get_checkpoint_model(self, checkpoint_id, name, path=''):
471 """construct the info dict for a given checkpoint"""
471 """construct the info dict for a given checkpoint"""
472 path = path.strip('/')
472 path = path.strip('/')
473 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
473 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
474 stats = os.stat(cp_path)
474 stats = os.stat(cp_path)
475 last_modified = tz.utcfromtimestamp(stats.st_mtime)
475 last_modified = tz.utcfromtimestamp(stats.st_mtime)
476 info = dict(
476 info = dict(
477 id = checkpoint_id,
477 id = checkpoint_id,
478 last_modified = last_modified,
478 last_modified = last_modified,
479 )
479 )
480 return info
480 return info
481
481
482 # public checkpoint API
482 # public checkpoint API
483
483
484 def create_checkpoint(self, name, path=''):
484 def create_checkpoint(self, name, path=''):
485 """Create a checkpoint from the current state of a file"""
485 """Create a checkpoint from the current state of a file"""
486 path = path.strip('/')
486 path = path.strip('/')
487 src_path = self._get_os_path(name, path)
487 src_path = self._get_os_path(name, path)
488 # only the one checkpoint ID:
488 # only the one checkpoint ID:
489 checkpoint_id = u"checkpoint"
489 checkpoint_id = u"checkpoint"
490 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
490 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
491 self.log.debug("creating checkpoint for %s", name)
491 self.log.debug("creating checkpoint for %s", name)
492 self._copy(src_path, cp_path)
492 self._copy(src_path, cp_path)
493
493
494 # return the checkpoint info
494 # return the checkpoint info
495 return self.get_checkpoint_model(checkpoint_id, name, path)
495 return self.get_checkpoint_model(checkpoint_id, name, path)
496
496
497 def list_checkpoints(self, name, path=''):
497 def list_checkpoints(self, name, path=''):
498 """list the checkpoints for a given file
498 """list the checkpoints for a given file
499
499
500 This contents manager currently only supports one checkpoint per file.
500 This contents manager currently only supports one checkpoint per file.
501 """
501 """
502 path = path.strip('/')
502 path = path.strip('/')
503 checkpoint_id = "checkpoint"
503 checkpoint_id = "checkpoint"
504 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
504 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
505 if not os.path.exists(os_path):
505 if not os.path.exists(os_path):
506 return []
506 return []
507 else:
507 else:
508 return [self.get_checkpoint_model(checkpoint_id, name, path)]
508 return [self.get_checkpoint_model(checkpoint_id, name, path)]
509
509
510
510
511 def restore_checkpoint(self, checkpoint_id, name, path=''):
511 def restore_checkpoint(self, checkpoint_id, name, path=''):
512 """restore a file to a checkpointed state"""
512 """restore a file to a checkpointed state"""
513 path = path.strip('/')
513 path = path.strip('/')
514 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
514 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
515 nb_path = self._get_os_path(name, path)
515 nb_path = self._get_os_path(name, path)
516 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
516 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
517 if not os.path.isfile(cp_path):
517 if not os.path.isfile(cp_path):
518 self.log.debug("checkpoint file does not exist: %s", cp_path)
518 self.log.debug("checkpoint file does not exist: %s", cp_path)
519 raise web.HTTPError(404,
519 raise web.HTTPError(404,
520 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
520 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
521 )
521 )
522 # ensure notebook is readable (never restore from an unreadable notebook)
522 # ensure notebook is readable (never restore from an unreadable notebook)
523 if cp_path.endswith('.ipynb'):
523 if cp_path.endswith('.ipynb'):
524 with io.open(cp_path, 'r', encoding='utf-8') as f:
524 with io.open(cp_path, 'r', encoding='utf-8') as f:
525 current.read(f, u'json')
525 nbformat.read(f, as_version=4)
526 self._copy(cp_path, nb_path)
526 self._copy(cp_path, nb_path)
527 self.log.debug("copying %s -> %s", cp_path, nb_path)
527 self.log.debug("copying %s -> %s", cp_path, nb_path)
528
528
529 def delete_checkpoint(self, checkpoint_id, name, path=''):
529 def delete_checkpoint(self, checkpoint_id, name, path=''):
530 """delete a file's checkpoint"""
530 """delete a file's checkpoint"""
531 path = path.strip('/')
531 path = path.strip('/')
532 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
532 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
533 if not os.path.isfile(cp_path):
533 if not os.path.isfile(cp_path):
534 raise web.HTTPError(404,
534 raise web.HTTPError(404,
535 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
535 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
536 )
536 )
537 self.log.debug("unlinking %s", cp_path)
537 self.log.debug("unlinking %s", cp_path)
538 os.unlink(cp_path)
538 os.unlink(cp_path)
539
539
540 def info_string(self):
540 def info_string(self):
541 return "Serving notebooks from local directory: %s" % self.root_dir
541 return "Serving notebooks from local directory: %s" % self.root_dir
542
542
543 def get_kernel_path(self, name, path='', model=None):
543 def get_kernel_path(self, name, path='', model=None):
544 """Return the initial working dir a kernel associated with a given notebook"""
544 """Return the initial working dir a kernel associated with a given notebook"""
545 return os.path.join(self.root_dir, path)
545 return os.path.join(self.root_dir, path)
@@ -1,345 +1,347
1 """A base class for contents managers."""
1 """A base class for contents managers."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from fnmatch import fnmatch
6 from fnmatch import fnmatch
7 import itertools
7 import itertools
8 import json
8 import json
9 import os
9 import os
10
10
11 from tornado.web import HTTPError
11 from tornado.web import HTTPError
12
12
13 from IPython.config.configurable import LoggingConfigurable
13 from IPython.config.configurable import LoggingConfigurable
14 from IPython.nbformat import current, sign
14 from IPython.nbformat import sign, validate, ValidationError
15 from IPython.nbformat.v4 import new_notebook
15 from IPython.utils.traitlets import Instance, Unicode, List
16 from IPython.utils.traitlets import Instance, Unicode, List
16
17
17
18
18 class ContentsManager(LoggingConfigurable):
19 class ContentsManager(LoggingConfigurable):
19 """Base class for serving files and directories.
20 """Base class for serving files and directories.
20
21
21 This serves any text or binary file,
22 This serves any text or binary file,
22 as well as directories,
23 as well as directories,
23 with special handling for JSON notebook documents.
24 with special handling for JSON notebook documents.
24
25
25 Most APIs take a path argument,
26 Most APIs take a path argument,
26 which is always an API-style unicode path,
27 which is always an API-style unicode path,
27 and always refers to a directory.
28 and always refers to a directory.
28
29
29 - unicode, not url-escaped
30 - unicode, not url-escaped
30 - '/'-separated
31 - '/'-separated
31 - leading and trailing '/' will be stripped
32 - leading and trailing '/' will be stripped
32 - if unspecified, path defaults to '',
33 - if unspecified, path defaults to '',
33 indicating the root path.
34 indicating the root path.
34
35
35 name is also unicode, and refers to a specfic target:
36 name is also unicode, and refers to a specfic target:
36
37
37 - unicode, not url-escaped
38 - unicode, not url-escaped
38 - must not contain '/'
39 - must not contain '/'
39 - It refers to an individual filename
40 - It refers to an individual filename
40 - It may refer to a directory name,
41 - It may refer to a directory name,
41 in the case of listing or creating directories.
42 in the case of listing or creating directories.
42
43
43 """
44 """
44
45
45 notary = Instance(sign.NotebookNotary)
46 notary = Instance(sign.NotebookNotary)
46 def _notary_default(self):
47 def _notary_default(self):
47 return sign.NotebookNotary(parent=self)
48 return sign.NotebookNotary(parent=self)
48
49
49 hide_globs = List(Unicode, [
50 hide_globs = List(Unicode, [
50 u'__pycache__', '*.pyc', '*.pyo',
51 u'__pycache__', '*.pyc', '*.pyo',
51 '.DS_Store', '*.so', '*.dylib', '*~',
52 '.DS_Store', '*.so', '*.dylib', '*~',
52 ], config=True, help="""
53 ], config=True, help="""
53 Glob patterns to hide in file and directory listings.
54 Glob patterns to hide in file and directory listings.
54 """)
55 """)
55
56
56 untitled_notebook = Unicode("Untitled", config=True,
57 untitled_notebook = Unicode("Untitled", config=True,
57 help="The base name used when creating untitled notebooks."
58 help="The base name used when creating untitled notebooks."
58 )
59 )
59
60
60 untitled_file = Unicode("untitled", config=True,
61 untitled_file = Unicode("untitled", config=True,
61 help="The base name used when creating untitled files."
62 help="The base name used when creating untitled files."
62 )
63 )
63
64
64 untitled_directory = Unicode("Untitled Folder", config=True,
65 untitled_directory = Unicode("Untitled Folder", config=True,
65 help="The base name used when creating untitled directories."
66 help="The base name used when creating untitled directories."
66 )
67 )
67
68
68 # ContentsManager API part 1: methods that must be
69 # ContentsManager API part 1: methods that must be
69 # implemented in subclasses.
70 # implemented in subclasses.
70
71
71 def path_exists(self, path):
72 def path_exists(self, path):
72 """Does the API-style path (directory) actually exist?
73 """Does the API-style path (directory) actually exist?
73
74
74 Like os.path.isdir
75 Like os.path.isdir
75
76
76 Override this method in subclasses.
77 Override this method in subclasses.
77
78
78 Parameters
79 Parameters
79 ----------
80 ----------
80 path : string
81 path : string
81 The path to check
82 The path to check
82
83
83 Returns
84 Returns
84 -------
85 -------
85 exists : bool
86 exists : bool
86 Whether the path does indeed exist.
87 Whether the path does indeed exist.
87 """
88 """
88 raise NotImplementedError
89 raise NotImplementedError
89
90
90 def is_hidden(self, path):
91 def is_hidden(self, path):
91 """Does the API style path correspond to a hidden directory or file?
92 """Does the API style path correspond to a hidden directory or file?
92
93
93 Parameters
94 Parameters
94 ----------
95 ----------
95 path : string
96 path : string
96 The path to check. This is an API path (`/` separated,
97 The path to check. This is an API path (`/` separated,
97 relative to root dir).
98 relative to root dir).
98
99
99 Returns
100 Returns
100 -------
101 -------
101 hidden : bool
102 hidden : bool
102 Whether the path is hidden.
103 Whether the path is hidden.
103
104
104 """
105 """
105 raise NotImplementedError
106 raise NotImplementedError
106
107
107 def file_exists(self, name, path=''):
108 def file_exists(self, name, path=''):
108 """Does a file exist at the given name and path?
109 """Does a file exist at the given name and path?
109
110
110 Like os.path.isfile
111 Like os.path.isfile
111
112
112 Override this method in subclasses.
113 Override this method in subclasses.
113
114
114 Parameters
115 Parameters
115 ----------
116 ----------
116 name : string
117 name : string
117 The name of the file you are checking.
118 The name of the file you are checking.
118 path : string
119 path : string
119 The relative path to the file's directory (with '/' as separator)
120 The relative path to the file's directory (with '/' as separator)
120
121
121 Returns
122 Returns
122 -------
123 -------
123 exists : bool
124 exists : bool
124 Whether the file exists.
125 Whether the file exists.
125 """
126 """
126 raise NotImplementedError('must be implemented in a subclass')
127 raise NotImplementedError('must be implemented in a subclass')
127
128
128 def exists(self, name, path=''):
129 def exists(self, name, path=''):
129 """Does a file or directory exist at the given name and path?
130 """Does a file or directory exist at the given name and path?
130
131
131 Like os.path.exists
132 Like os.path.exists
132
133
133 Parameters
134 Parameters
134 ----------
135 ----------
135 name : string
136 name : string
136 The name of the file you are checking.
137 The name of the file you are checking.
137 path : string
138 path : string
138 The relative path to the file's directory (with '/' as separator)
139 The relative path to the file's directory (with '/' as separator)
139
140
140 Returns
141 Returns
141 -------
142 -------
142 exists : bool
143 exists : bool
143 Whether the target exists.
144 Whether the target exists.
144 """
145 """
145 return self.file_exists(name, path) or self.path_exists("%s/%s" % (path, name))
146 return self.file_exists(name, path) or self.path_exists("%s/%s" % (path, name))
146
147
147 def get_model(self, name, path='', content=True):
148 def get_model(self, name, path='', content=True):
148 """Get the model of a file or directory with or without content."""
149 """Get the model of a file or directory with or without content."""
149 raise NotImplementedError('must be implemented in a subclass')
150 raise NotImplementedError('must be implemented in a subclass')
150
151
151 def save(self, model, name, path=''):
152 def save(self, model, name, path=''):
152 """Save the file or directory and return the model with no content."""
153 """Save the file or directory and return the model with no content."""
153 raise NotImplementedError('must be implemented in a subclass')
154 raise NotImplementedError('must be implemented in a subclass')
154
155
155 def update(self, model, name, path=''):
156 def update(self, model, name, path=''):
156 """Update the file or directory and return the model with no content.
157 """Update the file or directory and return the model with no content.
157
158
158 For use in PATCH requests, to enable renaming a file without
159 For use in PATCH requests, to enable renaming a file without
159 re-uploading its contents. Only used for renaming at the moment.
160 re-uploading its contents. Only used for renaming at the moment.
160 """
161 """
161 raise NotImplementedError('must be implemented in a subclass')
162 raise NotImplementedError('must be implemented in a subclass')
162
163
163 def delete(self, name, path=''):
164 def delete(self, name, path=''):
164 """Delete file or directory by name and path."""
165 """Delete file or directory by name and path."""
165 raise NotImplementedError('must be implemented in a subclass')
166 raise NotImplementedError('must be implemented in a subclass')
166
167
167 def create_checkpoint(self, name, path=''):
168 def create_checkpoint(self, name, path=''):
168 """Create a checkpoint of the current state of a file
169 """Create a checkpoint of the current state of a file
169
170
170 Returns a checkpoint_id for the new checkpoint.
171 Returns a checkpoint_id for the new checkpoint.
171 """
172 """
172 raise NotImplementedError("must be implemented in a subclass")
173 raise NotImplementedError("must be implemented in a subclass")
173
174
174 def list_checkpoints(self, name, path=''):
175 def list_checkpoints(self, name, path=''):
175 """Return a list of checkpoints for a given file"""
176 """Return a list of checkpoints for a given file"""
176 return []
177 return []
177
178
178 def restore_checkpoint(self, checkpoint_id, name, path=''):
179 def restore_checkpoint(self, checkpoint_id, name, path=''):
179 """Restore a file from one of its checkpoints"""
180 """Restore a file from one of its checkpoints"""
180 raise NotImplementedError("must be implemented in a subclass")
181 raise NotImplementedError("must be implemented in a subclass")
181
182
182 def delete_checkpoint(self, checkpoint_id, name, path=''):
183 def delete_checkpoint(self, checkpoint_id, name, path=''):
183 """delete a checkpoint for a file"""
184 """delete a checkpoint for a file"""
184 raise NotImplementedError("must be implemented in a subclass")
185 raise NotImplementedError("must be implemented in a subclass")
185
186
186 # ContentsManager API part 2: methods that have useable default
187 # ContentsManager API part 2: methods that have useable default
187 # implementations, but can be overridden in subclasses.
188 # implementations, but can be overridden in subclasses.
188
189
189 def info_string(self):
190 def info_string(self):
190 return "Serving contents"
191 return "Serving contents"
191
192
192 def get_kernel_path(self, name, path='', model=None):
193 def get_kernel_path(self, name, path='', model=None):
193 """ Return the path to start kernel in """
194 """ Return the path to start kernel in """
194 return path
195 return path
195
196
196 def increment_filename(self, filename, path=''):
197 def increment_filename(self, filename, path=''):
197 """Increment a filename until it is unique.
198 """Increment a filename until it is unique.
198
199
199 Parameters
200 Parameters
200 ----------
201 ----------
201 filename : unicode
202 filename : unicode
202 The name of a file, including extension
203 The name of a file, including extension
203 path : unicode
204 path : unicode
204 The API path of the target's directory
205 The API path of the target's directory
205
206
206 Returns
207 Returns
207 -------
208 -------
208 name : unicode
209 name : unicode
209 A filename that is unique, based on the input filename.
210 A filename that is unique, based on the input filename.
210 """
211 """
211 path = path.strip('/')
212 path = path.strip('/')
212 basename, ext = os.path.splitext(filename)
213 basename, ext = os.path.splitext(filename)
213 for i in itertools.count():
214 for i in itertools.count():
214 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
215 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
215 ext=ext)
216 ext=ext)
216 if not self.file_exists(name, path):
217 if not self.file_exists(name, path):
217 break
218 break
218 return name
219 return name
219
220
220 def validate_notebook_model(self, model):
221 def validate_notebook_model(self, model):
221 """Add failed-validation message to model"""
222 """Add failed-validation message to model"""
222 try:
223 try:
223 current.validate(model['content'])
224 validate(model['content'])
224 except current.ValidationError as e:
225 except ValidationError as e:
225 model['message'] = 'Notebook Validation failed: {}:\n{}'.format(
226 model['message'] = 'Notebook Validation failed: {}:\n{}'.format(
226 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
227 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
227 )
228 )
228 return model
229 return model
229
230
230 def create_file(self, model=None, path='', ext='.ipynb'):
231 def create_file(self, model=None, path='', ext='.ipynb'):
231 """Create a new file or directory and return its model with no content."""
232 """Create a new file or directory and return its model with no content."""
232 path = path.strip('/')
233 path = path.strip('/')
233 if model is None:
234 if model is None:
234 model = {}
235 model = {}
235 if 'content' not in model and model.get('type', None) != 'directory':
236 if 'content' not in model and model.get('type', None) != 'directory':
236 if ext == '.ipynb':
237 if ext == '.ipynb':
237 model['content'] = current.new_notebook()
238 model['content'] = new_notebook()
238 model['type'] = 'notebook'
239 model['type'] = 'notebook'
239 model['format'] = 'json'
240 model['format'] = 'json'
240 else:
241 else:
241 model['content'] = ''
242 model['content'] = ''
242 model['type'] = 'file'
243 model['type'] = 'file'
243 model['format'] = 'text'
244 model['format'] = 'text'
244 if 'name' not in model:
245 if 'name' not in model:
245 if model['type'] == 'directory':
246 if model['type'] == 'directory':
246 untitled = self.untitled_directory
247 untitled = self.untitled_directory
247 elif model['type'] == 'notebook':
248 elif model['type'] == 'notebook':
248 untitled = self.untitled_notebook
249 untitled = self.untitled_notebook
249 elif model['type'] == 'file':
250 elif model['type'] == 'file':
250 untitled = self.untitled_file
251 untitled = self.untitled_file
251 else:
252 else:
252 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
253 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
253 model['name'] = self.increment_filename(untitled + ext, path)
254 model['name'] = self.increment_filename(untitled + ext, path)
254
255
255 model['path'] = path
256 model['path'] = path
256 model = self.save(model, model['name'], model['path'])
257 model = self.save(model, model['name'], model['path'])
257 return model
258 return model
258
259
259 def copy(self, from_name, to_name=None, path=''):
260 def copy(self, from_name, to_name=None, path=''):
260 """Copy an existing file and return its new model.
261 """Copy an existing file and return its new model.
261
262
262 If to_name not specified, increment `from_name-Copy#.ext`.
263 If to_name not specified, increment `from_name-Copy#.ext`.
263
264
264 copy_from can be a full path to a file,
265 copy_from can be a full path to a file,
265 or just a base name. If a base name, `path` is used.
266 or just a base name. If a base name, `path` is used.
266 """
267 """
267 path = path.strip('/')
268 path = path.strip('/')
268 if '/' in from_name:
269 if '/' in from_name:
269 from_path, from_name = from_name.rsplit('/', 1)
270 from_path, from_name = from_name.rsplit('/', 1)
270 else:
271 else:
271 from_path = path
272 from_path = path
272 model = self.get_model(from_name, from_path)
273 model = self.get_model(from_name, from_path)
273 if model['type'] == 'directory':
274 if model['type'] == 'directory':
274 raise HTTPError(400, "Can't copy directories")
275 raise HTTPError(400, "Can't copy directories")
275 if not to_name:
276 if not to_name:
276 base, ext = os.path.splitext(from_name)
277 base, ext = os.path.splitext(from_name)
277 copy_name = u'{0}-Copy{1}'.format(base, ext)
278 copy_name = u'{0}-Copy{1}'.format(base, ext)
278 to_name = self.increment_filename(copy_name, path)
279 to_name = self.increment_filename(copy_name, path)
279 model['name'] = to_name
280 model['name'] = to_name
280 model['path'] = path
281 model['path'] = path
281 model = self.save(model, to_name, path)
282 model = self.save(model, to_name, path)
282 return model
283 return model
283
284
284 def log_info(self):
285 def log_info(self):
285 self.log.info(self.info_string())
286 self.log.info(self.info_string())
286
287
287 def trust_notebook(self, name, path=''):
288 def trust_notebook(self, name, path=''):
288 """Explicitly trust a notebook
289 """Explicitly trust a notebook
289
290
290 Parameters
291 Parameters
291 ----------
292 ----------
292 name : string
293 name : string
293 The filename of the notebook
294 The filename of the notebook
294 path : string
295 path : string
295 The notebook's directory
296 The notebook's directory
296 """
297 """
297 model = self.get_model(name, path)
298 model = self.get_model(name, path)
298 nb = model['content']
299 nb = model['content']
299 self.log.warn("Trusting notebook %s/%s", path, name)
300 self.log.warn("Trusting notebook %s/%s", path, name)
300 self.notary.mark_cells(nb, True)
301 self.notary.mark_cells(nb, True)
301 self.save(model, name, path)
302 self.save(model, name, path)
302
303
303 def check_and_sign(self, nb, name='', path=''):
304 def check_and_sign(self, nb, name='', path=''):
304 """Check for trusted cells, and sign the notebook.
305 """Check for trusted cells, and sign the notebook.
305
306
306 Called as a part of saving notebooks.
307 Called as a part of saving notebooks.
307
308
308 Parameters
309 Parameters
309 ----------
310 ----------
310 nb : dict
311 nb : dict
311 The notebook object (in nbformat.current format)
312 The notebook object (in current nbformat)
312 name : string
313 name : string
313 The filename of the notebook (for logging)
314 The filename of the notebook (for logging)
314 path : string
315 path : string
315 The notebook's directory (for logging)
316 The notebook's directory (for logging)
316 """
317 """
317 if nb['nbformat'] != current.nbformat:
318 # can't sign old notebooks
319 if nb['nbformat'] != 4:
318 return
320 return
319 if self.notary.check_cells(nb):
321 if self.notary.check_cells(nb):
320 self.notary.sign(nb)
322 self.notary.sign(nb)
321 else:
323 else:
322 self.log.warn("Saving untrusted notebook %s/%s", path, name)
324 self.log.warn("Saving untrusted notebook %s/%s", path, name)
323
325
324 def mark_trusted_cells(self, nb, name='', path=''):
326 def mark_trusted_cells(self, nb, name='', path=''):
325 """Mark cells as trusted if the notebook signature matches.
327 """Mark cells as trusted if the notebook signature matches.
326
328
327 Called as a part of loading notebooks.
329 Called as a part of loading notebooks.
328
330
329 Parameters
331 Parameters
330 ----------
332 ----------
331 nb : dict
333 nb : dict
332 The notebook object (in nbformat.current format)
334 The notebook object (in current nbformat)
333 name : string
335 name : string
334 The filename of the notebook (for logging)
336 The filename of the notebook (for logging)
335 path : string
337 path : string
336 The notebook's directory (for logging)
338 The notebook's directory (for logging)
337 """
339 """
338 trusted = self.notary.check_signature(nb)
340 trusted = self.notary.check_signature(nb)
339 if not trusted:
341 if not trusted:
340 self.log.warn("Notebook %s/%s is not trusted", path, name)
342 self.log.warn("Notebook %s/%s is not trusted", path, name)
341 self.notary.mark_cells(nb, trusted)
343 self.notary.mark_cells(nb, trusted)
342
344
343 def should_list(self, name):
345 def should_list(self, name):
344 """Should this file/directory name be displayed in a listing?"""
346 """Should this file/directory name be displayed in a listing?"""
345 return not any(fnmatch(name, glob) for glob in self.hide_globs)
347 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,480 +1,481
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 import io
5 import io
6 import json
6 import json
7 import os
7 import os
8 import shutil
8 import shutil
9 from unicodedata import normalize
9 from unicodedata import normalize
10
10
11 pjoin = os.path.join
11 pjoin = os.path.join
12
12
13 import requests
13 import requests
14
14
15 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.utils import url_path_join, url_escape
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import current
17 from IPython.nbformat import read, write, from_dict
18 from IPython.nbformat.current import (new_notebook, write, read,
18 from IPython.nbformat.v4 import (
19 new_markdown_cell, from_dict)
19 new_notebook, new_markdown_cell,
20 )
20 from IPython.nbformat import v2
21 from IPython.nbformat import v2
21 from IPython.utils import py3compat
22 from IPython.utils import py3compat
22 from IPython.utils.data import uniq_stable
23 from IPython.utils.data import uniq_stable
23
24
24
25
25 def notebooks_only(dir_model):
26 def notebooks_only(dir_model):
26 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27
28
28 def dirs_only(dir_model):
29 def dirs_only(dir_model):
29 return [x for x in dir_model['content'] if x['type']=='directory']
30 return [x for x in dir_model['content'] if x['type']=='directory']
30
31
31
32
32 class API(object):
33 class API(object):
33 """Wrapper for contents API calls."""
34 """Wrapper for contents API calls."""
34 def __init__(self, base_url):
35 def __init__(self, base_url):
35 self.base_url = base_url
36 self.base_url = base_url
36
37
37 def _req(self, verb, path, body=None):
38 def _req(self, verb, path, body=None):
38 response = requests.request(verb,
39 response = requests.request(verb,
39 url_path_join(self.base_url, 'api/contents', path),
40 url_path_join(self.base_url, 'api/contents', path),
40 data=body,
41 data=body,
41 )
42 )
42 response.raise_for_status()
43 response.raise_for_status()
43 return response
44 return response
44
45
45 def list(self, path='/'):
46 def list(self, path='/'):
46 return self._req('GET', path)
47 return self._req('GET', path)
47
48
48 def read(self, name, path='/'):
49 def read(self, name, path='/'):
49 return self._req('GET', url_path_join(path, name))
50 return self._req('GET', url_path_join(path, name))
50
51
51 def create_untitled(self, path='/', ext=None):
52 def create_untitled(self, path='/', ext=None):
52 body = None
53 body = None
53 if ext:
54 if ext:
54 body = json.dumps({'ext': ext})
55 body = json.dumps({'ext': ext})
55 return self._req('POST', path, body)
56 return self._req('POST', path, body)
56
57
57 def upload_untitled(self, body, path='/'):
58 def upload_untitled(self, body, path='/'):
58 return self._req('POST', path, body)
59 return self._req('POST', path, body)
59
60
60 def copy_untitled(self, copy_from, path='/'):
61 def copy_untitled(self, copy_from, path='/'):
61 body = json.dumps({'copy_from':copy_from})
62 body = json.dumps({'copy_from':copy_from})
62 return self._req('POST', path, body)
63 return self._req('POST', path, body)
63
64
64 def create(self, name, path='/'):
65 def create(self, name, path='/'):
65 return self._req('PUT', url_path_join(path, name))
66 return self._req('PUT', url_path_join(path, name))
66
67
67 def upload(self, name, body, path='/'):
68 def upload(self, name, body, path='/'):
68 return self._req('PUT', url_path_join(path, name), body)
69 return self._req('PUT', url_path_join(path, name), body)
69
70
70 def mkdir(self, name, path='/'):
71 def mkdir(self, name, path='/'):
71 return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
72 return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
72
73
73 def copy(self, copy_from, copy_to, path='/'):
74 def copy(self, copy_from, copy_to, path='/'):
74 body = json.dumps({'copy_from':copy_from})
75 body = json.dumps({'copy_from':copy_from})
75 return self._req('PUT', url_path_join(path, copy_to), body)
76 return self._req('PUT', url_path_join(path, copy_to), body)
76
77
77 def save(self, name, body, path='/'):
78 def save(self, name, body, path='/'):
78 return self._req('PUT', url_path_join(path, name), body)
79 return self._req('PUT', url_path_join(path, name), body)
79
80
80 def delete(self, name, path='/'):
81 def delete(self, name, path='/'):
81 return self._req('DELETE', url_path_join(path, name))
82 return self._req('DELETE', url_path_join(path, name))
82
83
83 def rename(self, name, path, new_name):
84 def rename(self, name, path, new_name):
84 body = json.dumps({'name': new_name})
85 body = json.dumps({'name': new_name})
85 return self._req('PATCH', url_path_join(path, name), body)
86 return self._req('PATCH', url_path_join(path, name), body)
86
87
87 def get_checkpoints(self, name, path):
88 def get_checkpoints(self, name, path):
88 return self._req('GET', url_path_join(path, name, 'checkpoints'))
89 return self._req('GET', url_path_join(path, name, 'checkpoints'))
89
90
90 def new_checkpoint(self, name, path):
91 def new_checkpoint(self, name, path):
91 return self._req('POST', url_path_join(path, name, 'checkpoints'))
92 return self._req('POST', url_path_join(path, name, 'checkpoints'))
92
93
93 def restore_checkpoint(self, name, path, checkpoint_id):
94 def restore_checkpoint(self, name, path, checkpoint_id):
94 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
95 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
95
96
96 def delete_checkpoint(self, name, path, checkpoint_id):
97 def delete_checkpoint(self, name, path, checkpoint_id):
97 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
98 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
98
99
99 class APITest(NotebookTestBase):
100 class APITest(NotebookTestBase):
100 """Test the kernels web service API"""
101 """Test the kernels web service API"""
101 dirs_nbs = [('', 'inroot'),
102 dirs_nbs = [('', 'inroot'),
102 ('Directory with spaces in', 'inspace'),
103 ('Directory with spaces in', 'inspace'),
103 (u'unicodé', 'innonascii'),
104 (u'unicodé', 'innonascii'),
104 ('foo', 'a'),
105 ('foo', 'a'),
105 ('foo', 'b'),
106 ('foo', 'b'),
106 ('foo', 'name with spaces'),
107 ('foo', 'name with spaces'),
107 ('foo', u'unicodé'),
108 ('foo', u'unicodé'),
108 ('foo/bar', 'baz'),
109 ('foo/bar', 'baz'),
109 ('ordering', 'A'),
110 ('ordering', 'A'),
110 ('ordering', 'b'),
111 ('ordering', 'b'),
111 ('ordering', 'C'),
112 ('ordering', 'C'),
112 (u'å b', u'ç d'),
113 (u'å b', u'ç d'),
113 ]
114 ]
114 hidden_dirs = ['.hidden', '__pycache__']
115 hidden_dirs = ['.hidden', '__pycache__']
115
116
116 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
117 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
117 del dirs[0] # remove ''
118 del dirs[0] # remove ''
118 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
119 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
119
120
120 @staticmethod
121 @staticmethod
121 def _blob_for_name(name):
122 def _blob_for_name(name):
122 return name.encode('utf-8') + b'\xFF'
123 return name.encode('utf-8') + b'\xFF'
123
124
124 @staticmethod
125 @staticmethod
125 def _txt_for_name(name):
126 def _txt_for_name(name):
126 return u'%s text file' % name
127 return u'%s text file' % name
127
128
128 def setUp(self):
129 def setUp(self):
129 nbdir = self.notebook_dir.name
130 nbdir = self.notebook_dir.name
130 self.blob = os.urandom(100)
131 self.blob = os.urandom(100)
131 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
132 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
132
133
133
134
134
135
135 for d in (self.dirs + self.hidden_dirs):
136 for d in (self.dirs + self.hidden_dirs):
136 d.replace('/', os.sep)
137 d.replace('/', os.sep)
137 if not os.path.isdir(pjoin(nbdir, d)):
138 if not os.path.isdir(pjoin(nbdir, d)):
138 os.mkdir(pjoin(nbdir, d))
139 os.mkdir(pjoin(nbdir, d))
139
140
140 for d, name in self.dirs_nbs:
141 for d, name in self.dirs_nbs:
141 d = d.replace('/', os.sep)
142 d = d.replace('/', os.sep)
142 # create a notebook
143 # create a notebook
143 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
144 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
144 encoding='utf-8') as f:
145 encoding='utf-8') as f:
145 nb = new_notebook()
146 nb = new_notebook()
146 write(nb, f, format='ipynb')
147 write(f, nb, version=4)
147
148
148 # create a text file
149 # create a text file
149 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
150 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
150 encoding='utf-8') as f:
151 encoding='utf-8') as f:
151 f.write(self._txt_for_name(name))
152 f.write(self._txt_for_name(name))
152
153
153 # create a binary file
154 # create a binary file
154 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
155 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
155 f.write(self._blob_for_name(name))
156 f.write(self._blob_for_name(name))
156
157
157 self.api = API(self.base_url())
158 self.api = API(self.base_url())
158
159
159 def tearDown(self):
160 def tearDown(self):
160 nbdir = self.notebook_dir.name
161 nbdir = self.notebook_dir.name
161
162
162 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
163 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
163 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
164 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
164
165
165 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
166 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
166 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
167 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
167
168
168 def test_list_notebooks(self):
169 def test_list_notebooks(self):
169 nbs = notebooks_only(self.api.list().json())
170 nbs = notebooks_only(self.api.list().json())
170 self.assertEqual(len(nbs), 1)
171 self.assertEqual(len(nbs), 1)
171 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
172 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
172
173
173 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
174 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
174 self.assertEqual(len(nbs), 1)
175 self.assertEqual(len(nbs), 1)
175 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
176 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
176
177
177 nbs = notebooks_only(self.api.list(u'/unicodé/').json())
178 nbs = notebooks_only(self.api.list(u'/unicodé/').json())
178 self.assertEqual(len(nbs), 1)
179 self.assertEqual(len(nbs), 1)
179 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
180 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
180 self.assertEqual(nbs[0]['path'], u'unicodé')
181 self.assertEqual(nbs[0]['path'], u'unicodé')
181
182
182 nbs = notebooks_only(self.api.list('/foo/bar/').json())
183 nbs = notebooks_only(self.api.list('/foo/bar/').json())
183 self.assertEqual(len(nbs), 1)
184 self.assertEqual(len(nbs), 1)
184 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
185 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
185 self.assertEqual(nbs[0]['path'], 'foo/bar')
186 self.assertEqual(nbs[0]['path'], 'foo/bar')
186
187
187 nbs = notebooks_only(self.api.list('foo').json())
188 nbs = notebooks_only(self.api.list('foo').json())
188 self.assertEqual(len(nbs), 4)
189 self.assertEqual(len(nbs), 4)
189 nbnames = { normalize('NFC', n['name']) for n in nbs }
190 nbnames = { normalize('NFC', n['name']) for n in nbs }
190 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
191 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
191 expected = { normalize('NFC', name) for name in expected }
192 expected = { normalize('NFC', name) for name in expected }
192 self.assertEqual(nbnames, expected)
193 self.assertEqual(nbnames, expected)
193
194
194 nbs = notebooks_only(self.api.list('ordering').json())
195 nbs = notebooks_only(self.api.list('ordering').json())
195 nbnames = [n['name'] for n in nbs]
196 nbnames = [n['name'] for n in nbs]
196 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
197 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
197 self.assertEqual(nbnames, expected)
198 self.assertEqual(nbnames, expected)
198
199
199 def test_list_dirs(self):
200 def test_list_dirs(self):
200 dirs = dirs_only(self.api.list().json())
201 dirs = dirs_only(self.api.list().json())
201 dir_names = {normalize('NFC', d['name']) for d in dirs}
202 dir_names = {normalize('NFC', d['name']) for d in dirs}
202 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
203 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
203
204
204 def test_list_nonexistant_dir(self):
205 def test_list_nonexistant_dir(self):
205 with assert_http_error(404):
206 with assert_http_error(404):
206 self.api.list('nonexistant')
207 self.api.list('nonexistant')
207
208
208 def test_get_nb_contents(self):
209 def test_get_nb_contents(self):
209 for d, name in self.dirs_nbs:
210 for d, name in self.dirs_nbs:
210 nb = self.api.read('%s.ipynb' % name, d+'/').json()
211 nb = self.api.read('%s.ipynb' % name, d+'/').json()
211 self.assertEqual(nb['name'], u'%s.ipynb' % name)
212 self.assertEqual(nb['name'], u'%s.ipynb' % name)
212 self.assertEqual(nb['type'], 'notebook')
213 self.assertEqual(nb['type'], 'notebook')
213 self.assertIn('content', nb)
214 self.assertIn('content', nb)
214 self.assertEqual(nb['format'], 'json')
215 self.assertEqual(nb['format'], 'json')
215 self.assertIn('content', nb)
216 self.assertIn('content', nb)
216 self.assertIn('metadata', nb['content'])
217 self.assertIn('metadata', nb['content'])
217 self.assertIsInstance(nb['content']['metadata'], dict)
218 self.assertIsInstance(nb['content']['metadata'], dict)
218
219
219 def test_get_contents_no_such_file(self):
220 def test_get_contents_no_such_file(self):
220 # Name that doesn't exist - should be a 404
221 # Name that doesn't exist - should be a 404
221 with assert_http_error(404):
222 with assert_http_error(404):
222 self.api.read('q.ipynb', 'foo')
223 self.api.read('q.ipynb', 'foo')
223
224
224 def test_get_text_file_contents(self):
225 def test_get_text_file_contents(self):
225 for d, name in self.dirs_nbs:
226 for d, name in self.dirs_nbs:
226 model = self.api.read(u'%s.txt' % name, d+'/').json()
227 model = self.api.read(u'%s.txt' % name, d+'/').json()
227 self.assertEqual(model['name'], u'%s.txt' % name)
228 self.assertEqual(model['name'], u'%s.txt' % name)
228 self.assertIn('content', model)
229 self.assertIn('content', model)
229 self.assertEqual(model['format'], 'text')
230 self.assertEqual(model['format'], 'text')
230 self.assertEqual(model['type'], 'file')
231 self.assertEqual(model['type'], 'file')
231 self.assertEqual(model['content'], self._txt_for_name(name))
232 self.assertEqual(model['content'], self._txt_for_name(name))
232
233
233 # Name that doesn't exist - should be a 404
234 # Name that doesn't exist - should be a 404
234 with assert_http_error(404):
235 with assert_http_error(404):
235 self.api.read('q.txt', 'foo')
236 self.api.read('q.txt', 'foo')
236
237
237 def test_get_binary_file_contents(self):
238 def test_get_binary_file_contents(self):
238 for d, name in self.dirs_nbs:
239 for d, name in self.dirs_nbs:
239 model = self.api.read(u'%s.blob' % name, d+'/').json()
240 model = self.api.read(u'%s.blob' % name, d+'/').json()
240 self.assertEqual(model['name'], u'%s.blob' % name)
241 self.assertEqual(model['name'], u'%s.blob' % name)
241 self.assertIn('content', model)
242 self.assertIn('content', model)
242 self.assertEqual(model['format'], 'base64')
243 self.assertEqual(model['format'], 'base64')
243 self.assertEqual(model['type'], 'file')
244 self.assertEqual(model['type'], 'file')
244 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
245 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
245 self.assertEqual(model['content'], b64_data)
246 self.assertEqual(model['content'], b64_data)
246
247
247 # Name that doesn't exist - should be a 404
248 # Name that doesn't exist - should be a 404
248 with assert_http_error(404):
249 with assert_http_error(404):
249 self.api.read('q.txt', 'foo')
250 self.api.read('q.txt', 'foo')
250
251
251 def _check_created(self, resp, name, path, type='notebook'):
252 def _check_created(self, resp, name, path, type='notebook'):
252 self.assertEqual(resp.status_code, 201)
253 self.assertEqual(resp.status_code, 201)
253 location_header = py3compat.str_to_unicode(resp.headers['Location'])
254 location_header = py3compat.str_to_unicode(resp.headers['Location'])
254 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
255 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
255 rjson = resp.json()
256 rjson = resp.json()
256 self.assertEqual(rjson['name'], name)
257 self.assertEqual(rjson['name'], name)
257 self.assertEqual(rjson['path'], path)
258 self.assertEqual(rjson['path'], path)
258 self.assertEqual(rjson['type'], type)
259 self.assertEqual(rjson['type'], type)
259 isright = os.path.isdir if type == 'directory' else os.path.isfile
260 isright = os.path.isdir if type == 'directory' else os.path.isfile
260 assert isright(pjoin(
261 assert isright(pjoin(
261 self.notebook_dir.name,
262 self.notebook_dir.name,
262 path.replace('/', os.sep),
263 path.replace('/', os.sep),
263 name,
264 name,
264 ))
265 ))
265
266
266 def test_create_untitled(self):
267 def test_create_untitled(self):
267 resp = self.api.create_untitled(path=u'å b')
268 resp = self.api.create_untitled(path=u'å b')
268 self._check_created(resp, 'Untitled0.ipynb', u'å b')
269 self._check_created(resp, 'Untitled0.ipynb', u'å b')
269
270
270 # Second time
271 # Second time
271 resp = self.api.create_untitled(path=u'å b')
272 resp = self.api.create_untitled(path=u'å b')
272 self._check_created(resp, 'Untitled1.ipynb', u'å b')
273 self._check_created(resp, 'Untitled1.ipynb', u'å b')
273
274
274 # And two directories down
275 # And two directories down
275 resp = self.api.create_untitled(path='foo/bar')
276 resp = self.api.create_untitled(path='foo/bar')
276 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
277 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
277
278
278 def test_create_untitled_txt(self):
279 def test_create_untitled_txt(self):
279 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
280 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
280 self._check_created(resp, 'untitled0.txt', 'foo/bar', type='file')
281 self._check_created(resp, 'untitled0.txt', 'foo/bar', type='file')
281
282
282 resp = self.api.read(path='foo/bar', name='untitled0.txt')
283 resp = self.api.read(path='foo/bar', name='untitled0.txt')
283 model = resp.json()
284 model = resp.json()
284 self.assertEqual(model['type'], 'file')
285 self.assertEqual(model['type'], 'file')
285 self.assertEqual(model['format'], 'text')
286 self.assertEqual(model['format'], 'text')
286 self.assertEqual(model['content'], '')
287 self.assertEqual(model['content'], '')
287
288
288 def test_upload_untitled(self):
289 def test_upload_untitled(self):
289 nb = new_notebook()
290 nb = new_notebook()
290 nbmodel = {'content': nb, 'type': 'notebook'}
291 nbmodel = {'content': nb, 'type': 'notebook'}
291 resp = self.api.upload_untitled(path=u'å b',
292 resp = self.api.upload_untitled(path=u'å b',
292 body=json.dumps(nbmodel))
293 body=json.dumps(nbmodel))
293 self._check_created(resp, 'Untitled0.ipynb', u'å b')
294 self._check_created(resp, 'Untitled0.ipynb', u'å b')
294
295
295 def test_upload(self):
296 def test_upload(self):
296 nb = new_notebook()
297 nb = new_notebook()
297 nbmodel = {'content': nb, 'type': 'notebook'}
298 nbmodel = {'content': nb, 'type': 'notebook'}
298 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
299 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
299 body=json.dumps(nbmodel))
300 body=json.dumps(nbmodel))
300 self._check_created(resp, u'Upload tést.ipynb', u'å b')
301 self._check_created(resp, u'Upload tést.ipynb', u'å b')
301
302
302 def test_mkdir(self):
303 def test_mkdir(self):
303 resp = self.api.mkdir(u'New ∂ir', path=u'å b')
304 resp = self.api.mkdir(u'New ∂ir', path=u'å b')
304 self._check_created(resp, u'New ∂ir', u'å b', type='directory')
305 self._check_created(resp, u'New ∂ir', u'å b', type='directory')
305
306
306 def test_mkdir_hidden_400(self):
307 def test_mkdir_hidden_400(self):
307 with assert_http_error(400):
308 with assert_http_error(400):
308 resp = self.api.mkdir(u'.hidden', path=u'å b')
309 resp = self.api.mkdir(u'.hidden', path=u'å b')
309
310
310 def test_upload_txt(self):
311 def test_upload_txt(self):
311 body = u'ünicode téxt'
312 body = u'ünicode téxt'
312 model = {
313 model = {
313 'content' : body,
314 'content' : body,
314 'format' : 'text',
315 'format' : 'text',
315 'type' : 'file',
316 'type' : 'file',
316 }
317 }
317 resp = self.api.upload(u'Upload tést.txt', path=u'å b',
318 resp = self.api.upload(u'Upload tést.txt', path=u'å b',
318 body=json.dumps(model))
319 body=json.dumps(model))
319
320
320 # check roundtrip
321 # check roundtrip
321 resp = self.api.read(path=u'å b', name=u'Upload tést.txt')
322 resp = self.api.read(path=u'å b', name=u'Upload tést.txt')
322 model = resp.json()
323 model = resp.json()
323 self.assertEqual(model['type'], 'file')
324 self.assertEqual(model['type'], 'file')
324 self.assertEqual(model['format'], 'text')
325 self.assertEqual(model['format'], 'text')
325 self.assertEqual(model['content'], body)
326 self.assertEqual(model['content'], body)
326
327
327 def test_upload_b64(self):
328 def test_upload_b64(self):
328 body = b'\xFFblob'
329 body = b'\xFFblob'
329 b64body = base64.encodestring(body).decode('ascii')
330 b64body = base64.encodestring(body).decode('ascii')
330 model = {
331 model = {
331 'content' : b64body,
332 'content' : b64body,
332 'format' : 'base64',
333 'format' : 'base64',
333 'type' : 'file',
334 'type' : 'file',
334 }
335 }
335 resp = self.api.upload(u'Upload tést.blob', path=u'å b',
336 resp = self.api.upload(u'Upload tést.blob', path=u'å b',
336 body=json.dumps(model))
337 body=json.dumps(model))
337
338
338 # check roundtrip
339 # check roundtrip
339 resp = self.api.read(path=u'å b', name=u'Upload tést.blob')
340 resp = self.api.read(path=u'å b', name=u'Upload tést.blob')
340 model = resp.json()
341 model = resp.json()
341 self.assertEqual(model['type'], 'file')
342 self.assertEqual(model['type'], 'file')
342 self.assertEqual(model['format'], 'base64')
343 self.assertEqual(model['format'], 'base64')
343 decoded = base64.decodestring(model['content'].encode('ascii'))
344 decoded = base64.decodestring(model['content'].encode('ascii'))
344 self.assertEqual(decoded, body)
345 self.assertEqual(decoded, body)
345
346
346 def test_upload_v2(self):
347 def test_upload_v2(self):
347 nb = v2.new_notebook()
348 nb = v2.new_notebook()
348 ws = v2.new_worksheet()
349 ws = v2.new_worksheet()
349 nb.worksheets.append(ws)
350 nb.worksheets.append(ws)
350 ws.cells.append(v2.new_code_cell(input='print("hi")'))
351 ws.cells.append(v2.new_code_cell(input='print("hi")'))
351 nbmodel = {'content': nb, 'type': 'notebook'}
352 nbmodel = {'content': nb, 'type': 'notebook'}
352 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
353 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
353 body=json.dumps(nbmodel))
354 body=json.dumps(nbmodel))
354 self._check_created(resp, u'Upload tést.ipynb', u'å b')
355 self._check_created(resp, u'Upload tést.ipynb', u'å b')
355 resp = self.api.read(u'Upload tést.ipynb', u'å b')
356 resp = self.api.read(u'Upload tést.ipynb', u'å b')
356 data = resp.json()
357 data = resp.json()
357 self.assertEqual(data['content']['nbformat'], current.nbformat)
358 self.assertEqual(data['content']['nbformat'], 4)
358
359
359 def test_copy_untitled(self):
360 def test_copy_untitled(self):
360 resp = self.api.copy_untitled(u'ç d.ipynb', path=u'å b')
361 resp = self.api.copy_untitled(u'ç d.ipynb', path=u'å b')
361 self._check_created(resp, u'ç d-Copy0.ipynb', u'å b')
362 self._check_created(resp, u'ç d-Copy0.ipynb', u'å b')
362
363
363 def test_copy(self):
364 def test_copy(self):
364 resp = self.api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
365 resp = self.api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
365 self._check_created(resp, u'cøpy.ipynb', u'å b')
366 self._check_created(resp, u'cøpy.ipynb', u'å b')
366
367
367 def test_copy_path(self):
368 def test_copy_path(self):
368 resp = self.api.copy(u'foo/a.ipynb', u'cøpyfoo.ipynb', path=u'å b')
369 resp = self.api.copy(u'foo/a.ipynb', u'cøpyfoo.ipynb', path=u'å b')
369 self._check_created(resp, u'cøpyfoo.ipynb', u'å b')
370 self._check_created(resp, u'cøpyfoo.ipynb', u'å b')
370
371
371 def test_copy_dir_400(self):
372 def test_copy_dir_400(self):
372 # can't copy directories
373 # can't copy directories
373 with assert_http_error(400):
374 with assert_http_error(400):
374 resp = self.api.copy(u'å b', u'å c')
375 resp = self.api.copy(u'å b', u'å c')
375
376
376 def test_delete(self):
377 def test_delete(self):
377 for d, name in self.dirs_nbs:
378 for d, name in self.dirs_nbs:
378 resp = self.api.delete('%s.ipynb' % name, d)
379 resp = self.api.delete('%s.ipynb' % name, d)
379 self.assertEqual(resp.status_code, 204)
380 self.assertEqual(resp.status_code, 204)
380
381
381 for d in self.dirs + ['/']:
382 for d in self.dirs + ['/']:
382 nbs = notebooks_only(self.api.list(d).json())
383 nbs = notebooks_only(self.api.list(d).json())
383 self.assertEqual(len(nbs), 0)
384 self.assertEqual(len(nbs), 0)
384
385
385 def test_delete_dirs(self):
386 def test_delete_dirs(self):
386 # depth-first delete everything, so we don't try to delete empty directories
387 # depth-first delete everything, so we don't try to delete empty directories
387 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
388 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
388 listing = self.api.list(name).json()['content']
389 listing = self.api.list(name).json()['content']
389 for model in listing:
390 for model in listing:
390 self.api.delete(model['name'], model['path'])
391 self.api.delete(model['name'], model['path'])
391 listing = self.api.list('/').json()['content']
392 listing = self.api.list('/').json()['content']
392 self.assertEqual(listing, [])
393 self.assertEqual(listing, [])
393
394
394 def test_delete_non_empty_dir(self):
395 def test_delete_non_empty_dir(self):
395 """delete non-empty dir raises 400"""
396 """delete non-empty dir raises 400"""
396 with assert_http_error(400):
397 with assert_http_error(400):
397 self.api.delete(u'å b')
398 self.api.delete(u'å b')
398
399
399 def test_rename(self):
400 def test_rename(self):
400 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
401 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
401 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
402 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
402 self.assertEqual(resp.json()['name'], 'z.ipynb')
403 self.assertEqual(resp.json()['name'], 'z.ipynb')
403 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
404 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
404
405
405 nbs = notebooks_only(self.api.list('foo').json())
406 nbs = notebooks_only(self.api.list('foo').json())
406 nbnames = set(n['name'] for n in nbs)
407 nbnames = set(n['name'] for n in nbs)
407 self.assertIn('z.ipynb', nbnames)
408 self.assertIn('z.ipynb', nbnames)
408 self.assertNotIn('a.ipynb', nbnames)
409 self.assertNotIn('a.ipynb', nbnames)
409
410
410 def test_rename_existing(self):
411 def test_rename_existing(self):
411 with assert_http_error(409):
412 with assert_http_error(409):
412 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
413 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
413
414
414 def test_save(self):
415 def test_save(self):
415 resp = self.api.read('a.ipynb', 'foo')
416 resp = self.api.read('a.ipynb', 'foo')
416 nbcontent = json.loads(resp.text)['content']
417 nbcontent = json.loads(resp.text)['content']
417 nb = from_dict(nbcontent)
418 nb = from_dict(nbcontent)
418 nb.cells.append(new_markdown_cell(u'Created by test ³'))
419 nb.cells.append(new_markdown_cell(u'Created by test ³'))
419
420
420 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
421 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
421 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
422 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
422
423
423 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
424 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
424 with io.open(nbfile, 'r', encoding='utf-8') as f:
425 with io.open(nbfile, 'r', encoding='utf-8') as f:
425 newnb = read(f, format='ipynb')
426 newnb = read(f, as_version=4)
426 self.assertEqual(newnb.cells[0].source,
427 self.assertEqual(newnb.cells[0].source,
427 u'Created by test ³')
428 u'Created by test ³')
428 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
429 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
429 newnb = from_dict(nbcontent)
430 newnb = from_dict(nbcontent)
430 self.assertEqual(newnb.cells[0].source,
431 self.assertEqual(newnb.cells[0].source,
431 u'Created by test ³')
432 u'Created by test ³')
432
433
433 # Save and rename
434 # Save and rename
434 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
435 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
435 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
436 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
436 saved = resp.json()
437 saved = resp.json()
437 self.assertEqual(saved['name'], 'a2.ipynb')
438 self.assertEqual(saved['name'], 'a2.ipynb')
438 self.assertEqual(saved['path'], 'foo/bar')
439 self.assertEqual(saved['path'], 'foo/bar')
439 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
440 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
440 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
441 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
441 with assert_http_error(404):
442 with assert_http_error(404):
442 self.api.read('a.ipynb', 'foo')
443 self.api.read('a.ipynb', 'foo')
443
444
444 def test_checkpoints(self):
445 def test_checkpoints(self):
445 resp = self.api.read('a.ipynb', 'foo')
446 resp = self.api.read('a.ipynb', 'foo')
446 r = self.api.new_checkpoint('a.ipynb', 'foo')
447 r = self.api.new_checkpoint('a.ipynb', 'foo')
447 self.assertEqual(r.status_code, 201)
448 self.assertEqual(r.status_code, 201)
448 cp1 = r.json()
449 cp1 = r.json()
449 self.assertEqual(set(cp1), {'id', 'last_modified'})
450 self.assertEqual(set(cp1), {'id', 'last_modified'})
450 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
451 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
451
452
452 # Modify it
453 # Modify it
453 nbcontent = json.loads(resp.text)['content']
454 nbcontent = json.loads(resp.text)['content']
454 nb = from_dict(nbcontent)
455 nb = from_dict(nbcontent)
455 hcell = new_markdown_cell('Created by test')
456 hcell = new_markdown_cell('Created by test')
456 nb.cells.append(hcell)
457 nb.cells.append(hcell)
457 # Save
458 # Save
458 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
459 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
459 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
460 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
460
461
461 # List checkpoints
462 # List checkpoints
462 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
463 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
463 self.assertEqual(cps, [cp1])
464 self.assertEqual(cps, [cp1])
464
465
465 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
466 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
466 nb = from_dict(nbcontent)
467 nb = from_dict(nbcontent)
467 self.assertEqual(nb.cells[0].source, 'Created by test')
468 self.assertEqual(nb.cells[0].source, 'Created by test')
468
469
469 # Restore cp1
470 # Restore cp1
470 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
471 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
471 self.assertEqual(r.status_code, 204)
472 self.assertEqual(r.status_code, 204)
472 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
473 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
473 nb = from_dict(nbcontent)
474 nb = from_dict(nbcontent)
474 self.assertEqual(nb.cells, [])
475 self.assertEqual(nb.cells, [])
475
476
476 # Delete cp1
477 # Delete cp1
477 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
478 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
478 self.assertEqual(r.status_code, 204)
479 self.assertEqual(r.status_code, 204)
479 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
480 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
480 self.assertEqual(cps, [])
481 self.assertEqual(cps, [])
@@ -1,332 +1,332
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the notebook manager."""
2 """Tests for the notebook manager."""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 import logging
5 import logging
6 import os
6 import os
7
7
8 from tornado.web import HTTPError
8 from tornado.web import HTTPError
9 from unittest import TestCase
9 from unittest import TestCase
10 from tempfile import NamedTemporaryFile
10 from tempfile import NamedTemporaryFile
11
11
12 from IPython.nbformat import current
12 from IPython.nbformat import v4 as nbformat
13
13
14 from IPython.utils.tempdir import TemporaryDirectory
14 from IPython.utils.tempdir import TemporaryDirectory
15 from IPython.utils.traitlets import TraitError
15 from IPython.utils.traitlets import TraitError
16 from IPython.html.utils import url_path_join
16 from IPython.html.utils import url_path_join
17 from IPython.testing import decorators as dec
17 from IPython.testing import decorators as dec
18
18
19 from ..filemanager import FileContentsManager
19 from ..filemanager import FileContentsManager
20 from ..manager import ContentsManager
20 from ..manager import ContentsManager
21
21
22
22
23 class TestFileContentsManager(TestCase):
23 class TestFileContentsManager(TestCase):
24
24
25 def test_root_dir(self):
25 def test_root_dir(self):
26 with TemporaryDirectory() as td:
26 with TemporaryDirectory() as td:
27 fm = FileContentsManager(root_dir=td)
27 fm = FileContentsManager(root_dir=td)
28 self.assertEqual(fm.root_dir, td)
28 self.assertEqual(fm.root_dir, td)
29
29
30 def test_missing_root_dir(self):
30 def test_missing_root_dir(self):
31 with TemporaryDirectory() as td:
31 with TemporaryDirectory() as td:
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34
34
35 def test_invalid_root_dir(self):
35 def test_invalid_root_dir(self):
36 with NamedTemporaryFile() as tf:
36 with NamedTemporaryFile() as tf:
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38
38
39 def test_get_os_path(self):
39 def test_get_os_path(self):
40 # full filesystem path should be returned with correct operating system
40 # full filesystem path should be returned with correct operating system
41 # separators.
41 # separators.
42 with TemporaryDirectory() as td:
42 with TemporaryDirectory() as td:
43 root = td
43 root = td
44 fm = FileContentsManager(root_dir=root)
44 fm = FileContentsManager(root_dir=root)
45 path = fm._get_os_path('test.ipynb', '/path/to/notebook/')
45 path = fm._get_os_path('test.ipynb', '/path/to/notebook/')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 self.assertEqual(path, fs_path)
48 self.assertEqual(path, fs_path)
49
49
50 fm = FileContentsManager(root_dir=root)
50 fm = FileContentsManager(root_dir=root)
51 path = fm._get_os_path('test.ipynb')
51 path = fm._get_os_path('test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 self.assertEqual(path, fs_path)
53 self.assertEqual(path, fs_path)
54
54
55 fm = FileContentsManager(root_dir=root)
55 fm = FileContentsManager(root_dir=root)
56 path = fm._get_os_path('test.ipynb', '////')
56 path = fm._get_os_path('test.ipynb', '////')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 self.assertEqual(path, fs_path)
58 self.assertEqual(path, fs_path)
59
59
60 def test_checkpoint_subdir(self):
60 def test_checkpoint_subdir(self):
61 subd = u'sub ∂ir'
61 subd = u'sub ∂ir'
62 cp_name = 'test-cp.ipynb'
62 cp_name = 'test-cp.ipynb'
63 with TemporaryDirectory() as td:
63 with TemporaryDirectory() as td:
64 root = td
64 root = td
65 os.mkdir(os.path.join(td, subd))
65 os.mkdir(os.path.join(td, subd))
66 fm = FileContentsManager(root_dir=root)
66 fm = FileContentsManager(root_dir=root)
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb', '/')
67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb', '/')
68 cp_subdir = fm.get_checkpoint_path('cp', 'test.ipynb', '/%s/' % subd)
68 cp_subdir = fm.get_checkpoint_path('cp', 'test.ipynb', '/%s/' % subd)
69 self.assertNotEqual(cp_dir, cp_subdir)
69 self.assertNotEqual(cp_dir, cp_subdir)
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72
72
73
73
74 class TestContentsManager(TestCase):
74 class TestContentsManager(TestCase):
75
75
76 def setUp(self):
76 def setUp(self):
77 self._temp_dir = TemporaryDirectory()
77 self._temp_dir = TemporaryDirectory()
78 self.td = self._temp_dir.name
78 self.td = self._temp_dir.name
79 self.contents_manager = FileContentsManager(
79 self.contents_manager = FileContentsManager(
80 root_dir=self.td,
80 root_dir=self.td,
81 log=logging.getLogger()
81 log=logging.getLogger()
82 )
82 )
83
83
84 def tearDown(self):
84 def tearDown(self):
85 self._temp_dir.cleanup()
85 self._temp_dir.cleanup()
86
86
87 def make_dir(self, abs_path, rel_path):
87 def make_dir(self, abs_path, rel_path):
88 """make subdirectory, rel_path is the relative path
88 """make subdirectory, rel_path is the relative path
89 to that directory from the location where the server started"""
89 to that directory from the location where the server started"""
90 os_path = os.path.join(abs_path, rel_path)
90 os_path = os.path.join(abs_path, rel_path)
91 try:
91 try:
92 os.makedirs(os_path)
92 os.makedirs(os_path)
93 except OSError:
93 except OSError:
94 print("Directory already exists: %r" % os_path)
94 print("Directory already exists: %r" % os_path)
95 return os_path
95 return os_path
96
96
97 def add_code_cell(self, nb):
97 def add_code_cell(self, nb):
98 output = current.new_output("display_data", {'application/javascript': "alert('hi');"})
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 cell = current.new_code_cell("print('hi')", outputs=[output])
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 nb.cells.append(cell)
100 nb.cells.append(cell)
101
101
102 def new_notebook(self):
102 def new_notebook(self):
103 cm = self.contents_manager
103 cm = self.contents_manager
104 model = cm.create_file()
104 model = cm.create_file()
105 name = model['name']
105 name = model['name']
106 path = model['path']
106 path = model['path']
107
107
108 full_model = cm.get_model(name, path)
108 full_model = cm.get_model(name, path)
109 nb = full_model['content']
109 nb = full_model['content']
110 self.add_code_cell(nb)
110 self.add_code_cell(nb)
111
111
112 cm.save(full_model, name, path)
112 cm.save(full_model, name, path)
113 return nb, name, path
113 return nb, name, path
114
114
115 def test_create_file(self):
115 def test_create_file(self):
116 cm = self.contents_manager
116 cm = self.contents_manager
117 # Test in root directory
117 # Test in root directory
118 model = cm.create_file()
118 model = cm.create_file()
119 assert isinstance(model, dict)
119 assert isinstance(model, dict)
120 self.assertIn('name', model)
120 self.assertIn('name', model)
121 self.assertIn('path', model)
121 self.assertIn('path', model)
122 self.assertEqual(model['name'], 'Untitled0.ipynb')
122 self.assertEqual(model['name'], 'Untitled0.ipynb')
123 self.assertEqual(model['path'], '')
123 self.assertEqual(model['path'], '')
124
124
125 # Test in sub-directory
125 # Test in sub-directory
126 sub_dir = '/foo/'
126 sub_dir = '/foo/'
127 self.make_dir(cm.root_dir, 'foo')
127 self.make_dir(cm.root_dir, 'foo')
128 model = cm.create_file(None, sub_dir)
128 model = cm.create_file(None, sub_dir)
129 assert isinstance(model, dict)
129 assert isinstance(model, dict)
130 self.assertIn('name', model)
130 self.assertIn('name', model)
131 self.assertIn('path', model)
131 self.assertIn('path', model)
132 self.assertEqual(model['name'], 'Untitled0.ipynb')
132 self.assertEqual(model['name'], 'Untitled0.ipynb')
133 self.assertEqual(model['path'], sub_dir.strip('/'))
133 self.assertEqual(model['path'], sub_dir.strip('/'))
134
134
135 def test_get(self):
135 def test_get(self):
136 cm = self.contents_manager
136 cm = self.contents_manager
137 # Create a notebook
137 # Create a notebook
138 model = cm.create_file()
138 model = cm.create_file()
139 name = model['name']
139 name = model['name']
140 path = model['path']
140 path = model['path']
141
141
142 # Check that we 'get' on the notebook we just created
142 # Check that we 'get' on the notebook we just created
143 model2 = cm.get_model(name, path)
143 model2 = cm.get_model(name, path)
144 assert isinstance(model2, dict)
144 assert isinstance(model2, dict)
145 self.assertIn('name', model2)
145 self.assertIn('name', model2)
146 self.assertIn('path', model2)
146 self.assertIn('path', model2)
147 self.assertEqual(model['name'], name)
147 self.assertEqual(model['name'], name)
148 self.assertEqual(model['path'], path)
148 self.assertEqual(model['path'], path)
149
149
150 # Test in sub-directory
150 # Test in sub-directory
151 sub_dir = '/foo/'
151 sub_dir = '/foo/'
152 self.make_dir(cm.root_dir, 'foo')
152 self.make_dir(cm.root_dir, 'foo')
153 model = cm.create_file(None, sub_dir)
153 model = cm.create_file(None, sub_dir)
154 model2 = cm.get_model(name, sub_dir)
154 model2 = cm.get_model(name, sub_dir)
155 assert isinstance(model2, dict)
155 assert isinstance(model2, dict)
156 self.assertIn('name', model2)
156 self.assertIn('name', model2)
157 self.assertIn('path', model2)
157 self.assertIn('path', model2)
158 self.assertIn('content', model2)
158 self.assertIn('content', model2)
159 self.assertEqual(model2['name'], 'Untitled0.ipynb')
159 self.assertEqual(model2['name'], 'Untitled0.ipynb')
160 self.assertEqual(model2['path'], sub_dir.strip('/'))
160 self.assertEqual(model2['path'], sub_dir.strip('/'))
161
161
162 @dec.skip_win32
162 @dec.skip_win32
163 def test_bad_symlink(self):
163 def test_bad_symlink(self):
164 cm = self.contents_manager
164 cm = self.contents_manager
165 path = 'test bad symlink'
165 path = 'test bad symlink'
166 os_path = self.make_dir(cm.root_dir, path)
166 os_path = self.make_dir(cm.root_dir, path)
167
167
168 file_model = cm.create_file(path=path, ext='.txt')
168 file_model = cm.create_file(path=path, ext='.txt')
169
169
170 # create a broken symlink
170 # create a broken symlink
171 os.symlink("target", os.path.join(os_path, "bad symlink"))
171 os.symlink("target", os.path.join(os_path, "bad symlink"))
172 model = cm.get_model(path)
172 model = cm.get_model(path)
173 self.assertEqual(model['content'], [file_model])
173 self.assertEqual(model['content'], [file_model])
174
174
175 @dec.skip_win32
175 @dec.skip_win32
176 def test_good_symlink(self):
176 def test_good_symlink(self):
177 cm = self.contents_manager
177 cm = self.contents_manager
178 path = 'test good symlink'
178 path = 'test good symlink'
179 os_path = self.make_dir(cm.root_dir, path)
179 os_path = self.make_dir(cm.root_dir, path)
180
180
181 file_model = cm.create_file(path=path, ext='.txt')
181 file_model = cm.create_file(path=path, ext='.txt')
182
182
183 # create a good symlink
183 # create a good symlink
184 os.symlink(file_model['name'], os.path.join(os_path, "good symlink"))
184 os.symlink(file_model['name'], os.path.join(os_path, "good symlink"))
185 symlink_model = cm.get_model(name="good symlink", path=path, content=False)
185 symlink_model = cm.get_model(name="good symlink", path=path, content=False)
186
186
187 dir_model = cm.get_model(path)
187 dir_model = cm.get_model(path)
188 self.assertEqual(
188 self.assertEqual(
189 sorted(dir_model['content'], key=lambda x: x['name']),
189 sorted(dir_model['content'], key=lambda x: x['name']),
190 [symlink_model, file_model],
190 [symlink_model, file_model],
191 )
191 )
192
192
193 def test_update(self):
193 def test_update(self):
194 cm = self.contents_manager
194 cm = self.contents_manager
195 # Create a notebook
195 # Create a notebook
196 model = cm.create_file()
196 model = cm.create_file()
197 name = model['name']
197 name = model['name']
198 path = model['path']
198 path = model['path']
199
199
200 # Change the name in the model for rename
200 # Change the name in the model for rename
201 model['name'] = 'test.ipynb'
201 model['name'] = 'test.ipynb'
202 model = cm.update(model, name, path)
202 model = cm.update(model, name, path)
203 assert isinstance(model, dict)
203 assert isinstance(model, dict)
204 self.assertIn('name', model)
204 self.assertIn('name', model)
205 self.assertIn('path', model)
205 self.assertIn('path', model)
206 self.assertEqual(model['name'], 'test.ipynb')
206 self.assertEqual(model['name'], 'test.ipynb')
207
207
208 # Make sure the old name is gone
208 # Make sure the old name is gone
209 self.assertRaises(HTTPError, cm.get_model, name, path)
209 self.assertRaises(HTTPError, cm.get_model, name, path)
210
210
211 # Test in sub-directory
211 # Test in sub-directory
212 # Create a directory and notebook in that directory
212 # Create a directory and notebook in that directory
213 sub_dir = '/foo/'
213 sub_dir = '/foo/'
214 self.make_dir(cm.root_dir, 'foo')
214 self.make_dir(cm.root_dir, 'foo')
215 model = cm.create_file(None, sub_dir)
215 model = cm.create_file(None, sub_dir)
216 name = model['name']
216 name = model['name']
217 path = model['path']
217 path = model['path']
218
218
219 # Change the name in the model for rename
219 # Change the name in the model for rename
220 model['name'] = 'test_in_sub.ipynb'
220 model['name'] = 'test_in_sub.ipynb'
221 model = cm.update(model, name, path)
221 model = cm.update(model, name, path)
222 assert isinstance(model, dict)
222 assert isinstance(model, dict)
223 self.assertIn('name', model)
223 self.assertIn('name', model)
224 self.assertIn('path', model)
224 self.assertIn('path', model)
225 self.assertEqual(model['name'], 'test_in_sub.ipynb')
225 self.assertEqual(model['name'], 'test_in_sub.ipynb')
226 self.assertEqual(model['path'], sub_dir.strip('/'))
226 self.assertEqual(model['path'], sub_dir.strip('/'))
227
227
228 # Make sure the old name is gone
228 # Make sure the old name is gone
229 self.assertRaises(HTTPError, cm.get_model, name, path)
229 self.assertRaises(HTTPError, cm.get_model, name, path)
230
230
231 def test_save(self):
231 def test_save(self):
232 cm = self.contents_manager
232 cm = self.contents_manager
233 # Create a notebook
233 # Create a notebook
234 model = cm.create_file()
234 model = cm.create_file()
235 name = model['name']
235 name = model['name']
236 path = model['path']
236 path = model['path']
237
237
238 # Get the model with 'content'
238 # Get the model with 'content'
239 full_model = cm.get_model(name, path)
239 full_model = cm.get_model(name, path)
240
240
241 # Save the notebook
241 # Save the notebook
242 model = cm.save(full_model, name, path)
242 model = cm.save(full_model, name, path)
243 assert isinstance(model, dict)
243 assert isinstance(model, dict)
244 self.assertIn('name', model)
244 self.assertIn('name', model)
245 self.assertIn('path', model)
245 self.assertIn('path', model)
246 self.assertEqual(model['name'], name)
246 self.assertEqual(model['name'], name)
247 self.assertEqual(model['path'], path)
247 self.assertEqual(model['path'], path)
248
248
249 # Test in sub-directory
249 # Test in sub-directory
250 # Create a directory and notebook in that directory
250 # Create a directory and notebook in that directory
251 sub_dir = '/foo/'
251 sub_dir = '/foo/'
252 self.make_dir(cm.root_dir, 'foo')
252 self.make_dir(cm.root_dir, 'foo')
253 model = cm.create_file(None, sub_dir)
253 model = cm.create_file(None, sub_dir)
254 name = model['name']
254 name = model['name']
255 path = model['path']
255 path = model['path']
256 model = cm.get_model(name, path)
256 model = cm.get_model(name, path)
257
257
258 # Change the name in the model for rename
258 # Change the name in the model for rename
259 model = cm.save(model, name, path)
259 model = cm.save(model, name, path)
260 assert isinstance(model, dict)
260 assert isinstance(model, dict)
261 self.assertIn('name', model)
261 self.assertIn('name', model)
262 self.assertIn('path', model)
262 self.assertIn('path', model)
263 self.assertEqual(model['name'], 'Untitled0.ipynb')
263 self.assertEqual(model['name'], 'Untitled0.ipynb')
264 self.assertEqual(model['path'], sub_dir.strip('/'))
264 self.assertEqual(model['path'], sub_dir.strip('/'))
265
265
266 def test_delete(self):
266 def test_delete(self):
267 cm = self.contents_manager
267 cm = self.contents_manager
268 # Create a notebook
268 # Create a notebook
269 nb, name, path = self.new_notebook()
269 nb, name, path = self.new_notebook()
270
270
271 # Delete the notebook
271 # Delete the notebook
272 cm.delete(name, path)
272 cm.delete(name, path)
273
273
274 # Check that a 'get' on the deleted notebook raises and error
274 # Check that a 'get' on the deleted notebook raises and error
275 self.assertRaises(HTTPError, cm.get_model, name, path)
275 self.assertRaises(HTTPError, cm.get_model, name, path)
276
276
277 def test_copy(self):
277 def test_copy(self):
278 cm = self.contents_manager
278 cm = self.contents_manager
279 path = u'å b'
279 path = u'å b'
280 name = u'nb √.ipynb'
280 name = u'nb √.ipynb'
281 os.mkdir(os.path.join(cm.root_dir, path))
281 os.mkdir(os.path.join(cm.root_dir, path))
282 orig = cm.create_file({'name' : name}, path=path)
282 orig = cm.create_file({'name' : name}, path=path)
283
283
284 # copy with unspecified name
284 # copy with unspecified name
285 copy = cm.copy(name, path=path)
285 copy = cm.copy(name, path=path)
286 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
286 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
287
287
288 # copy with specified name
288 # copy with specified name
289 copy2 = cm.copy(name, u'copy 2.ipynb', path=path)
289 copy2 = cm.copy(name, u'copy 2.ipynb', path=path)
290 self.assertEqual(copy2['name'], u'copy 2.ipynb')
290 self.assertEqual(copy2['name'], u'copy 2.ipynb')
291
291
292 def test_trust_notebook(self):
292 def test_trust_notebook(self):
293 cm = self.contents_manager
293 cm = self.contents_manager
294 nb, name, path = self.new_notebook()
294 nb, name, path = self.new_notebook()
295
295
296 untrusted = cm.get_model(name, path)['content']
296 untrusted = cm.get_model(name, path)['content']
297 assert not cm.notary.check_cells(untrusted)
297 assert not cm.notary.check_cells(untrusted)
298
298
299 # print(untrusted)
299 # print(untrusted)
300 cm.trust_notebook(name, path)
300 cm.trust_notebook(name, path)
301 trusted = cm.get_model(name, path)['content']
301 trusted = cm.get_model(name, path)['content']
302 # print(trusted)
302 # print(trusted)
303 assert cm.notary.check_cells(trusted)
303 assert cm.notary.check_cells(trusted)
304
304
305 def test_mark_trusted_cells(self):
305 def test_mark_trusted_cells(self):
306 cm = self.contents_manager
306 cm = self.contents_manager
307 nb, name, path = self.new_notebook()
307 nb, name, path = self.new_notebook()
308
308
309 cm.mark_trusted_cells(nb, name, path)
309 cm.mark_trusted_cells(nb, name, path)
310 for cell in nb.cells:
310 for cell in nb.cells:
311 if cell.cell_type == 'code':
311 if cell.cell_type == 'code':
312 assert not cell.metadata.trusted
312 assert not cell.metadata.trusted
313
313
314 cm.trust_notebook(name, path)
314 cm.trust_notebook(name, path)
315 nb = cm.get_model(name, path)['content']
315 nb = cm.get_model(name, path)['content']
316 for cell in nb.cells:
316 for cell in nb.cells:
317 if cell.cell_type == 'code':
317 if cell.cell_type == 'code':
318 assert cell.metadata.trusted
318 assert cell.metadata.trusted
319
319
320 def test_check_and_sign(self):
320 def test_check_and_sign(self):
321 cm = self.contents_manager
321 cm = self.contents_manager
322 nb, name, path = self.new_notebook()
322 nb, name, path = self.new_notebook()
323
323
324 cm.mark_trusted_cells(nb, name, path)
324 cm.mark_trusted_cells(nb, name, path)
325 cm.check_and_sign(nb, name, path)
325 cm.check_and_sign(nb, name, path)
326 assert not cm.notary.check_signature(nb)
326 assert not cm.notary.check_signature(nb)
327
327
328 cm.trust_notebook(name, path)
328 cm.trust_notebook(name, path)
329 nb = cm.get_model(name, path)['content']
329 nb = cm.get_model(name, path)['content']
330 cm.mark_trusted_cells(nb, name, path)
330 cm.mark_trusted_cells(nb, name, path)
331 cm.check_and_sign(nb, name, path)
331 cm.check_and_sign(nb, name, path)
332 assert cm.notary.check_signature(nb)
332 assert cm.notary.check_signature(nb)
@@ -1,116 +1,117
1 """Test the sessions web service API."""
1 """Test the sessions web service API."""
2
2
3 import errno
3 import errno
4 import io
4 import io
5 import os
5 import os
6 import json
6 import json
7 import requests
7 import requests
8 import shutil
8 import shutil
9
9
10 pjoin = os.path.join
10 pjoin = os.path.join
11
11
12 from IPython.html.utils import url_path_join
12 from IPython.html.utils import url_path_join
13 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
13 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
14 from IPython.nbformat.current import new_notebook, write
14 from IPython.nbformat.v4 import new_notebook
15 from IPython.nbformat import write
15
16
16 class SessionAPI(object):
17 class SessionAPI(object):
17 """Wrapper for notebook API calls."""
18 """Wrapper for notebook API calls."""
18 def __init__(self, base_url):
19 def __init__(self, base_url):
19 self.base_url = base_url
20 self.base_url = base_url
20
21
21 def _req(self, verb, path, body=None):
22 def _req(self, verb, path, body=None):
22 response = requests.request(verb,
23 response = requests.request(verb,
23 url_path_join(self.base_url, 'api/sessions', path), data=body)
24 url_path_join(self.base_url, 'api/sessions', path), data=body)
24
25
25 if 400 <= response.status_code < 600:
26 if 400 <= response.status_code < 600:
26 try:
27 try:
27 response.reason = response.json()['message']
28 response.reason = response.json()['message']
28 except:
29 except:
29 pass
30 pass
30 response.raise_for_status()
31 response.raise_for_status()
31
32
32 return response
33 return response
33
34
34 def list(self):
35 def list(self):
35 return self._req('GET', '')
36 return self._req('GET', '')
36
37
37 def get(self, id):
38 def get(self, id):
38 return self._req('GET', id)
39 return self._req('GET', id)
39
40
40 def create(self, name, path, kernel_name='python'):
41 def create(self, name, path, kernel_name='python'):
41 body = json.dumps({'notebook': {'name':name, 'path':path},
42 body = json.dumps({'notebook': {'name':name, 'path':path},
42 'kernel': {'name': kernel_name}})
43 'kernel': {'name': kernel_name}})
43 return self._req('POST', '', body)
44 return self._req('POST', '', body)
44
45
45 def modify(self, id, name, path):
46 def modify(self, id, name, path):
46 body = json.dumps({'notebook': {'name':name, 'path':path}})
47 body = json.dumps({'notebook': {'name':name, 'path':path}})
47 return self._req('PATCH', id, body)
48 return self._req('PATCH', id, body)
48
49
49 def delete(self, id):
50 def delete(self, id):
50 return self._req('DELETE', id)
51 return self._req('DELETE', id)
51
52
52 class SessionAPITest(NotebookTestBase):
53 class SessionAPITest(NotebookTestBase):
53 """Test the sessions web service API"""
54 """Test the sessions web service API"""
54 def setUp(self):
55 def setUp(self):
55 nbdir = self.notebook_dir.name
56 nbdir = self.notebook_dir.name
56 try:
57 try:
57 os.mkdir(pjoin(nbdir, 'foo'))
58 os.mkdir(pjoin(nbdir, 'foo'))
58 except OSError as e:
59 except OSError as e:
59 # Deleting the folder in an earlier test may have failed
60 # Deleting the folder in an earlier test may have failed
60 if e.errno != errno.EEXIST:
61 if e.errno != errno.EEXIST:
61 raise
62 raise
62
63
63 with io.open(pjoin(nbdir, 'foo', 'nb1.ipynb'), 'w',
64 with io.open(pjoin(nbdir, 'foo', 'nb1.ipynb'), 'w',
64 encoding='utf-8') as f:
65 encoding='utf-8') as f:
65 nb = new_notebook()
66 nb = new_notebook()
66 write(nb, f, format='ipynb')
67 write(f, nb, version=4)
67
68
68 self.sess_api = SessionAPI(self.base_url())
69 self.sess_api = SessionAPI(self.base_url())
69
70
70 def tearDown(self):
71 def tearDown(self):
71 for session in self.sess_api.list().json():
72 for session in self.sess_api.list().json():
72 self.sess_api.delete(session['id'])
73 self.sess_api.delete(session['id'])
73 shutil.rmtree(pjoin(self.notebook_dir.name, 'foo'),
74 shutil.rmtree(pjoin(self.notebook_dir.name, 'foo'),
74 ignore_errors=True)
75 ignore_errors=True)
75
76
76 def test_create(self):
77 def test_create(self):
77 sessions = self.sess_api.list().json()
78 sessions = self.sess_api.list().json()
78 self.assertEqual(len(sessions), 0)
79 self.assertEqual(len(sessions), 0)
79
80
80 resp = self.sess_api.create('nb1.ipynb', 'foo')
81 resp = self.sess_api.create('nb1.ipynb', 'foo')
81 self.assertEqual(resp.status_code, 201)
82 self.assertEqual(resp.status_code, 201)
82 newsession = resp.json()
83 newsession = resp.json()
83 self.assertIn('id', newsession)
84 self.assertIn('id', newsession)
84 self.assertEqual(newsession['notebook']['name'], 'nb1.ipynb')
85 self.assertEqual(newsession['notebook']['name'], 'nb1.ipynb')
85 self.assertEqual(newsession['notebook']['path'], 'foo')
86 self.assertEqual(newsession['notebook']['path'], 'foo')
86 self.assertEqual(resp.headers['Location'], '/api/sessions/{0}'.format(newsession['id']))
87 self.assertEqual(resp.headers['Location'], '/api/sessions/{0}'.format(newsession['id']))
87
88
88 sessions = self.sess_api.list().json()
89 sessions = self.sess_api.list().json()
89 self.assertEqual(sessions, [newsession])
90 self.assertEqual(sessions, [newsession])
90
91
91 # Retrieve it
92 # Retrieve it
92 sid = newsession['id']
93 sid = newsession['id']
93 got = self.sess_api.get(sid).json()
94 got = self.sess_api.get(sid).json()
94 self.assertEqual(got, newsession)
95 self.assertEqual(got, newsession)
95
96
96 def test_delete(self):
97 def test_delete(self):
97 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
98 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
98 sid = newsession['id']
99 sid = newsession['id']
99
100
100 resp = self.sess_api.delete(sid)
101 resp = self.sess_api.delete(sid)
101 self.assertEqual(resp.status_code, 204)
102 self.assertEqual(resp.status_code, 204)
102
103
103 sessions = self.sess_api.list().json()
104 sessions = self.sess_api.list().json()
104 self.assertEqual(sessions, [])
105 self.assertEqual(sessions, [])
105
106
106 with assert_http_error(404):
107 with assert_http_error(404):
107 self.sess_api.get(sid)
108 self.sess_api.get(sid)
108
109
109 def test_modify(self):
110 def test_modify(self):
110 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
111 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
111 sid = newsession['id']
112 sid = newsession['id']
112
113
113 changed = self.sess_api.modify(sid, 'nb2.ipynb', '').json()
114 changed = self.sess_api.modify(sid, 'nb2.ipynb', '').json()
114 self.assertEqual(changed['id'], sid)
115 self.assertEqual(changed['id'], sid)
115 self.assertEqual(changed['notebook']['name'], 'nb2.ipynb')
116 self.assertEqual(changed['notebook']['name'], 'nb2.ipynb')
116 self.assertEqual(changed['notebook']['path'], '')
117 self.assertEqual(changed['notebook']['path'], '')
@@ -1,151 +1,152
1 # coding: utf-8
1 # coding: utf-8
2 """Test the /files/ handler."""
2 """Test the /files/ handler."""
3
3
4 import io
4 import io
5 import os
5 import os
6 from unicodedata import normalize
6 from unicodedata import normalize
7
7
8 pjoin = os.path.join
8 pjoin = os.path.join
9
9
10 import requests
10 import requests
11 import json
11 import json
12
12
13 from IPython.nbformat.current import (new_notebook, write,
13 from IPython.nbformat import write
14 from IPython.nbformat.v4 import (new_notebook,
14 new_markdown_cell, new_code_cell,
15 new_markdown_cell, new_code_cell,
15 new_output)
16 new_output)
16
17
17 from IPython.html.utils import url_path_join
18 from IPython.html.utils import url_path_join
18 from .launchnotebook import NotebookTestBase
19 from .launchnotebook import NotebookTestBase
19 from IPython.utils import py3compat
20 from IPython.utils import py3compat
20
21
21
22
22 class FilesTest(NotebookTestBase):
23 class FilesTest(NotebookTestBase):
23 def test_hidden_files(self):
24 def test_hidden_files(self):
24 not_hidden = [
25 not_hidden = [
25 u'å b',
26 u'å b',
26 u'å b/ç. d',
27 u'å b/ç. d',
27 ]
28 ]
28 hidden = [
29 hidden = [
29 u'.å b',
30 u'.å b',
30 u'å b/.ç d',
31 u'å b/.ç d',
31 ]
32 ]
32 dirs = not_hidden + hidden
33 dirs = not_hidden + hidden
33
34
34 nbdir = self.notebook_dir.name
35 nbdir = self.notebook_dir.name
35 for d in dirs:
36 for d in dirs:
36 path = pjoin(nbdir, d.replace('/', os.sep))
37 path = pjoin(nbdir, d.replace('/', os.sep))
37 if not os.path.exists(path):
38 if not os.path.exists(path):
38 os.mkdir(path)
39 os.mkdir(path)
39 with open(pjoin(path, 'foo'), 'w') as f:
40 with open(pjoin(path, 'foo'), 'w') as f:
40 f.write('foo')
41 f.write('foo')
41 with open(pjoin(path, '.foo'), 'w') as f:
42 with open(pjoin(path, '.foo'), 'w') as f:
42 f.write('.foo')
43 f.write('.foo')
43 url = self.base_url()
44 url = self.base_url()
44
45
45 for d in not_hidden:
46 for d in not_hidden:
46 path = pjoin(nbdir, d.replace('/', os.sep))
47 path = pjoin(nbdir, d.replace('/', os.sep))
47 r = requests.get(url_path_join(url, 'files', d, 'foo'))
48 r = requests.get(url_path_join(url, 'files', d, 'foo'))
48 r.raise_for_status()
49 r.raise_for_status()
49 self.assertEqual(r.text, 'foo')
50 self.assertEqual(r.text, 'foo')
50 r = requests.get(url_path_join(url, 'files', d, '.foo'))
51 r = requests.get(url_path_join(url, 'files', d, '.foo'))
51 self.assertEqual(r.status_code, 404)
52 self.assertEqual(r.status_code, 404)
52
53
53 for d in hidden:
54 for d in hidden:
54 path = pjoin(nbdir, d.replace('/', os.sep))
55 path = pjoin(nbdir, d.replace('/', os.sep))
55 for foo in ('foo', '.foo'):
56 for foo in ('foo', '.foo'):
56 r = requests.get(url_path_join(url, 'files', d, foo))
57 r = requests.get(url_path_join(url, 'files', d, foo))
57 self.assertEqual(r.status_code, 404)
58 self.assertEqual(r.status_code, 404)
58
59
59 def test_contents_manager(self):
60 def test_contents_manager(self):
60 "make sure ContentsManager returns right files (ipynb, bin, txt)."
61 "make sure ContentsManager returns right files (ipynb, bin, txt)."
61
62
62 nbdir = self.notebook_dir.name
63 nbdir = self.notebook_dir.name
63 base = self.base_url()
64 base = self.base_url()
64
65
65 nb = new_notebook(
66 nb = new_notebook(
66 cells=[
67 cells=[
67 new_markdown_cell(u'Created by test ³'),
68 new_markdown_cell(u'Created by test ³'),
68 new_code_cell("print(2*6)", outputs=[
69 new_code_cell("print(2*6)", outputs=[
69 new_output("stream", text="12"),
70 new_output("stream", text="12"),
70 ])
71 ])
71 ]
72 ]
72 )
73 )
73
74
74 with io.open(pjoin(nbdir, 'testnb.ipynb'), 'w',
75 with io.open(pjoin(nbdir, 'testnb.ipynb'), 'w',
75 encoding='utf-8') as f:
76 encoding='utf-8') as f:
76 write(nb, f)
77 write(f, nb, version=4)
77
78
78 with io.open(pjoin(nbdir, 'test.bin'), 'wb') as f:
79 with io.open(pjoin(nbdir, 'test.bin'), 'wb') as f:
79 f.write(b'\xff' + os.urandom(5))
80 f.write(b'\xff' + os.urandom(5))
80 f.close()
81 f.close()
81
82
82 with io.open(pjoin(nbdir, 'test.txt'), 'w') as f:
83 with io.open(pjoin(nbdir, 'test.txt'), 'w') as f:
83 f.write(u'foobar')
84 f.write(u'foobar')
84 f.close()
85 f.close()
85
86
86 r = requests.get(url_path_join(base, 'files', 'testnb.ipynb'))
87 r = requests.get(url_path_join(base, 'files', 'testnb.ipynb'))
87 self.assertEqual(r.status_code, 200)
88 self.assertEqual(r.status_code, 200)
88 self.assertIn('print(2*6)', r.text)
89 self.assertIn('print(2*6)', r.text)
89 json.loads(r.text)
90 json.loads(r.text)
90
91
91 r = requests.get(url_path_join(base, 'files', 'test.bin'))
92 r = requests.get(url_path_join(base, 'files', 'test.bin'))
92 self.assertEqual(r.status_code, 200)
93 self.assertEqual(r.status_code, 200)
93 self.assertEqual(r.headers['content-type'], 'application/octet-stream')
94 self.assertEqual(r.headers['content-type'], 'application/octet-stream')
94 self.assertEqual(r.content[:1], b'\xff')
95 self.assertEqual(r.content[:1], b'\xff')
95 self.assertEqual(len(r.content), 6)
96 self.assertEqual(len(r.content), 6)
96
97
97 r = requests.get(url_path_join(base, 'files', 'test.txt'))
98 r = requests.get(url_path_join(base, 'files', 'test.txt'))
98 self.assertEqual(r.status_code, 200)
99 self.assertEqual(r.status_code, 200)
99 self.assertEqual(r.headers['content-type'], 'text/plain')
100 self.assertEqual(r.headers['content-type'], 'text/plain')
100 self.assertEqual(r.text, 'foobar')
101 self.assertEqual(r.text, 'foobar')
101
102
102 def test_download(self):
103 def test_download(self):
103 nbdir = self.notebook_dir.name
104 nbdir = self.notebook_dir.name
104 base = self.base_url()
105 base = self.base_url()
105
106
106 text = 'hello'
107 text = 'hello'
107 with open(pjoin(nbdir, 'test.txt'), 'w') as f:
108 with open(pjoin(nbdir, 'test.txt'), 'w') as f:
108 f.write(text)
109 f.write(text)
109
110
110 r = requests.get(url_path_join(base, 'files', 'test.txt'))
111 r = requests.get(url_path_join(base, 'files', 'test.txt'))
111 disposition = r.headers.get('Content-Disposition', '')
112 disposition = r.headers.get('Content-Disposition', '')
112 self.assertNotIn('attachment', disposition)
113 self.assertNotIn('attachment', disposition)
113
114
114 r = requests.get(url_path_join(base, 'files', 'test.txt') + '?download=1')
115 r = requests.get(url_path_join(base, 'files', 'test.txt') + '?download=1')
115 disposition = r.headers.get('Content-Disposition', '')
116 disposition = r.headers.get('Content-Disposition', '')
116 self.assertIn('attachment', disposition)
117 self.assertIn('attachment', disposition)
117 self.assertIn('filename="test.txt"', disposition)
118 self.assertIn('filename="test.txt"', disposition)
118
119
119 def test_old_files_redirect(self):
120 def test_old_files_redirect(self):
120 """pre-2.0 'files/' prefixed links are properly redirected"""
121 """pre-2.0 'files/' prefixed links are properly redirected"""
121 nbdir = self.notebook_dir.name
122 nbdir = self.notebook_dir.name
122 base = self.base_url()
123 base = self.base_url()
123
124
124 os.mkdir(pjoin(nbdir, 'files'))
125 os.mkdir(pjoin(nbdir, 'files'))
125 os.makedirs(pjoin(nbdir, 'sub', 'files'))
126 os.makedirs(pjoin(nbdir, 'sub', 'files'))
126
127
127 for prefix in ('', 'sub'):
128 for prefix in ('', 'sub'):
128 with open(pjoin(nbdir, prefix, 'files', 'f1.txt'), 'w') as f:
129 with open(pjoin(nbdir, prefix, 'files', 'f1.txt'), 'w') as f:
129 f.write(prefix + '/files/f1')
130 f.write(prefix + '/files/f1')
130 with open(pjoin(nbdir, prefix, 'files', 'f2.txt'), 'w') as f:
131 with open(pjoin(nbdir, prefix, 'files', 'f2.txt'), 'w') as f:
131 f.write(prefix + '/files/f2')
132 f.write(prefix + '/files/f2')
132 with open(pjoin(nbdir, prefix, 'f2.txt'), 'w') as f:
133 with open(pjoin(nbdir, prefix, 'f2.txt'), 'w') as f:
133 f.write(prefix + '/f2')
134 f.write(prefix + '/f2')
134 with open(pjoin(nbdir, prefix, 'f3.txt'), 'w') as f:
135 with open(pjoin(nbdir, prefix, 'f3.txt'), 'w') as f:
135 f.write(prefix + '/f3')
136 f.write(prefix + '/f3')
136
137
137 url = url_path_join(base, 'notebooks', prefix, 'files', 'f1.txt')
138 url = url_path_join(base, 'notebooks', prefix, 'files', 'f1.txt')
138 r = requests.get(url)
139 r = requests.get(url)
139 self.assertEqual(r.status_code, 200)
140 self.assertEqual(r.status_code, 200)
140 self.assertEqual(r.text, prefix + '/files/f1')
141 self.assertEqual(r.text, prefix + '/files/f1')
141
142
142 url = url_path_join(base, 'notebooks', prefix, 'files', 'f2.txt')
143 url = url_path_join(base, 'notebooks', prefix, 'files', 'f2.txt')
143 r = requests.get(url)
144 r = requests.get(url)
144 self.assertEqual(r.status_code, 200)
145 self.assertEqual(r.status_code, 200)
145 self.assertEqual(r.text, prefix + '/files/f2')
146 self.assertEqual(r.text, prefix + '/files/f2')
146
147
147 url = url_path_join(base, 'notebooks', prefix, 'files', 'f3.txt')
148 url = url_path_join(base, 'notebooks', prefix, 'files', 'f3.txt')
148 r = requests.get(url)
149 r = requests.get(url)
149 self.assertEqual(r.status_code, 200)
150 self.assertEqual(r.status_code, 200)
150 self.assertEqual(r.text, prefix + '/f3')
151 self.assertEqual(r.text, prefix + '/f3')
151
152
General Comments 0
You need to be logged in to leave comments. Login now