##// END OF EJS Templates
don't use nbformat.current in IPython.html...
MinRK -
Show More
@@ -1,148 +1,148
1 1 """Tornado handlers for nbconvert."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import io
7 7 import os
8 8 import zipfile
9 9
10 10 from tornado import web
11 11
12 12 from ..base.handlers import (
13 13 IPythonHandler, FilesRedirectHandler,
14 14 notebook_path_regex, path_regex,
15 15 )
16 from IPython.nbformat.current import from_dict
16 from IPython.nbformat import from_dict
17 17
18 18 from IPython.utils.py3compat import cast_bytes
19 19
20 20 def find_resource_files(output_files_dir):
21 21 files = []
22 22 for dirpath, dirnames, filenames in os.walk(output_files_dir):
23 23 files.extend([os.path.join(dirpath, f) for f in filenames])
24 24 return files
25 25
26 26 def respond_zip(handler, name, output, resources):
27 27 """Zip up the output and resource files and respond with the zip file.
28 28
29 29 Returns True if it has served a zip file, False if there are no resource
30 30 files, in which case we serve the plain output file.
31 31 """
32 32 # Check if we have resource files we need to zip
33 33 output_files = resources.get('outputs', None)
34 34 if not output_files:
35 35 return False
36 36
37 37 # Headers
38 38 zip_filename = os.path.splitext(name)[0] + '.zip'
39 39 handler.set_header('Content-Disposition',
40 40 'attachment; filename="%s"' % zip_filename)
41 41 handler.set_header('Content-Type', 'application/zip')
42 42
43 43 # Prepare the zip file
44 44 buffer = io.BytesIO()
45 45 zipf = zipfile.ZipFile(buffer, mode='w', compression=zipfile.ZIP_DEFLATED)
46 46 output_filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
47 47 zipf.writestr(output_filename, cast_bytes(output, 'utf-8'))
48 48 for filename, data in output_files.items():
49 49 zipf.writestr(os.path.basename(filename), data)
50 50 zipf.close()
51 51
52 52 handler.finish(buffer.getvalue())
53 53 return True
54 54
55 55 def get_exporter(format, **kwargs):
56 56 """get an exporter, raising appropriate errors"""
57 57 # if this fails, will raise 500
58 58 try:
59 59 from IPython.nbconvert.exporters.export import exporter_map
60 60 except ImportError as e:
61 61 raise web.HTTPError(500, "Could not import nbconvert: %s" % e)
62 62
63 63 try:
64 64 Exporter = exporter_map[format]
65 65 except KeyError:
66 66 # should this be 400?
67 67 raise web.HTTPError(404, u"No exporter for format: %s" % format)
68 68
69 69 try:
70 70 return Exporter(**kwargs)
71 71 except Exception as e:
72 72 raise web.HTTPError(500, "Could not construct Exporter: %s" % e)
73 73
74 74 class NbconvertFileHandler(IPythonHandler):
75 75
76 76 SUPPORTED_METHODS = ('GET',)
77 77
78 78 @web.authenticated
79 79 def get(self, format, path='', name=None):
80 80
81 81 exporter = get_exporter(format, config=self.config, log=self.log)
82 82
83 83 path = path.strip('/')
84 84 model = self.contents_manager.get_model(name=name, path=path)
85 85
86 86 self.set_header('Last-Modified', model['last_modified'])
87 87
88 88 try:
89 89 output, resources = exporter.from_notebook_node(model['content'])
90 90 except Exception as e:
91 91 raise web.HTTPError(500, "nbconvert failed: %s" % e)
92 92
93 93 if respond_zip(self, name, output, resources):
94 94 return
95 95
96 96 # Force download if requested
97 97 if self.get_argument('download', 'false').lower() == 'true':
98 98 filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
99 99 self.set_header('Content-Disposition',
100 100 'attachment; filename="%s"' % filename)
101 101
102 102 # MIME type
103 103 if exporter.output_mimetype:
104 104 self.set_header('Content-Type',
105 105 '%s; charset=utf-8' % exporter.output_mimetype)
106 106
107 107 self.finish(output)
108 108
109 109 class NbconvertPostHandler(IPythonHandler):
110 110 SUPPORTED_METHODS = ('POST',)
111 111
112 112 @web.authenticated
113 113 def post(self, format):
114 114 exporter = get_exporter(format, config=self.config)
115 115
116 116 model = self.get_json_body()
117 117 name = model.get('name', 'notebook.ipynb')
118 118 nbnode = from_dict(model['content'])
119 119
120 120 try:
121 121 output, resources = exporter.from_notebook_node(nbnode)
122 122 except Exception as e:
123 123 raise web.HTTPError(500, "nbconvert failed: %s" % e)
124 124
125 125 if respond_zip(self, name, output, resources):
126 126 return
127 127
128 128 # MIME type
129 129 if exporter.output_mimetype:
130 130 self.set_header('Content-Type',
131 131 '%s; charset=utf-8' % exporter.output_mimetype)
132 132
133 133 self.finish(output)
134 134
135 135
136 136 #-----------------------------------------------------------------------------
137 137 # URL to handler mappings
138 138 #-----------------------------------------------------------------------------
139 139
140 140 _format_regex = r"(?P<format>\w+)"
141 141
142 142
143 143 default_handlers = [
144 144 (r"/nbconvert/%s%s" % (_format_regex, notebook_path_regex),
145 145 NbconvertFileHandler),
146 146 (r"/nbconvert/%s" % _format_regex, NbconvertPostHandler),
147 147 (r"/nbconvert/html%s" % path_regex, FilesRedirectHandler),
148 148 ]
@@ -1,131 +1,132
1 1 # coding: utf-8
2 2 import base64
3 3 import io
4 4 import json
5 5 import os
6 6 from os.path import join as pjoin
7 7 import shutil
8 8
9 9 import requests
10 10
11 11 from IPython.html.utils import url_path_join
12 12 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
13 from IPython.nbformat.current import (new_notebook, write,
14 new_markdown_cell, new_code_cell,
15 new_output)
13 from IPython.nbformat import write
14 from IPython.nbformat.v4 import (
15 new_notebook, new_markdown_cell, new_code_cell, new_output,
16 )
16 17
17 18 from IPython.testing.decorators import onlyif_cmds_exist
18 19
19 20
20 21 class NbconvertAPI(object):
21 22 """Wrapper for nbconvert API calls."""
22 23 def __init__(self, base_url):
23 24 self.base_url = base_url
24 25
25 26 def _req(self, verb, path, body=None, params=None):
26 27 response = requests.request(verb,
27 28 url_path_join(self.base_url, 'nbconvert', path),
28 29 data=body, params=params,
29 30 )
30 31 response.raise_for_status()
31 32 return response
32 33
33 34 def from_file(self, format, path, name, download=False):
34 35 return self._req('GET', url_path_join(format, path, name),
35 36 params={'download':download})
36 37
37 38 def from_post(self, format, nbmodel):
38 39 body = json.dumps(nbmodel)
39 40 return self._req('POST', format, body)
40 41
41 42 def list_formats(self):
42 43 return self._req('GET', '')
43 44
44 45 png_green_pixel = base64.encodestring(b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00'
45 46 b'\x00\x00\x01\x00\x00x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\x0cIDAT'
46 47 b'\x08\xd7c\x90\xfb\xcf\x00\x00\x02\\\x01\x1e.~d\x87\x00\x00\x00\x00IEND\xaeB`\x82'
47 48 ).decode('ascii')
48 49
49 50 class APITest(NotebookTestBase):
50 51 def setUp(self):
51 52 nbdir = self.notebook_dir.name
52 53
53 54 if not os.path.isdir(pjoin(nbdir, 'foo')):
54 55 os.mkdir(pjoin(nbdir, 'foo'))
55 56
56 57 nb = new_notebook()
57 58
58 59 nb.cells.append(new_markdown_cell(u'Created by test ³'))
59 60 cc1 = new_code_cell(source=u'print(2*6)')
60 61 cc1.outputs.append(new_output(output_type="stream", text=u'12'))
61 62 cc1.outputs.append(new_output(output_type="execute_result",
62 63 data={'image/png' : png_green_pixel},
63 64 execution_count=1,
64 65 ))
65 66 nb.cells.append(cc1)
66 67
67 68 with io.open(pjoin(nbdir, 'foo', 'testnb.ipynb'), 'w',
68 69 encoding='utf-8') as f:
69 write(nb, f)
70 write(f, nb, version=4)
70 71
71 72 self.nbconvert_api = NbconvertAPI(self.base_url())
72 73
73 74 def tearDown(self):
74 75 nbdir = self.notebook_dir.name
75 76
76 77 for dname in ['foo']:
77 78 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
78 79
79 80 @onlyif_cmds_exist('pandoc')
80 81 def test_from_file(self):
81 82 r = self.nbconvert_api.from_file('html', 'foo', 'testnb.ipynb')
82 83 self.assertEqual(r.status_code, 200)
83 84 self.assertIn(u'text/html', r.headers['Content-Type'])
84 85 self.assertIn(u'Created by test', r.text)
85 86 self.assertIn(u'print', r.text)
86 87
87 88 r = self.nbconvert_api.from_file('python', 'foo', 'testnb.ipynb')
88 89 self.assertIn(u'text/x-python', r.headers['Content-Type'])
89 90 self.assertIn(u'print(2*6)', r.text)
90 91
91 92 @onlyif_cmds_exist('pandoc')
92 93 def test_from_file_404(self):
93 94 with assert_http_error(404):
94 95 self.nbconvert_api.from_file('html', 'foo', 'thisdoesntexist.ipynb')
95 96
96 97 @onlyif_cmds_exist('pandoc')
97 98 def test_from_file_download(self):
98 99 r = self.nbconvert_api.from_file('python', 'foo', 'testnb.ipynb', download=True)
99 100 content_disposition = r.headers['Content-Disposition']
100 101 self.assertIn('attachment', content_disposition)
101 102 self.assertIn('testnb.py', content_disposition)
102 103
103 104 @onlyif_cmds_exist('pandoc')
104 105 def test_from_file_zip(self):
105 106 r = self.nbconvert_api.from_file('latex', 'foo', 'testnb.ipynb', download=True)
106 107 self.assertIn(u'application/zip', r.headers['Content-Type'])
107 108 self.assertIn(u'.zip', r.headers['Content-Disposition'])
108 109
109 110 @onlyif_cmds_exist('pandoc')
110 111 def test_from_post(self):
111 112 nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
112 113 nbmodel = requests.get(nbmodel_url).json()
113 114
114 115 r = self.nbconvert_api.from_post(format='html', nbmodel=nbmodel)
115 116 self.assertEqual(r.status_code, 200)
116 117 self.assertIn(u'text/html', r.headers['Content-Type'])
117 118 self.assertIn(u'Created by test', r.text)
118 119 self.assertIn(u'print', r.text)
119 120
120 121 r = self.nbconvert_api.from_post(format='python', nbmodel=nbmodel)
121 122 self.assertIn(u'text/x-python', r.headers['Content-Type'])
122 123 self.assertIn(u'print(2*6)', r.text)
123 124
124 125 @onlyif_cmds_exist('pandoc')
125 126 def test_from_post_zip(self):
126 127 nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
127 128 nbmodel = requests.get(nbmodel_url).json()
128 129
129 130 r = self.nbconvert_api.from_post(format='latex', nbmodel=nbmodel)
130 131 self.assertIn(u'application/zip', r.headers['Content-Type'])
131 132 self.assertIn(u'.zip', r.headers['Content-Disposition'])
@@ -1,545 +1,545
1 1 """A contents manager that uses the local file system for storage."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 import io
8 8 import os
9 9 import glob
10 10 import shutil
11 11
12 12 from tornado import web
13 13
14 14 from .manager import ContentsManager
15 from IPython.nbformat import current
15 from IPython import nbformat
16 16 from IPython.utils.io import atomic_writing
17 17 from IPython.utils.path import ensure_dir_exists
18 18 from IPython.utils.traitlets import Unicode, Bool, TraitError
19 19 from IPython.utils.py3compat import getcwd
20 20 from IPython.utils import tz
21 21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
22 22
23 23
24 24 class FileContentsManager(ContentsManager):
25 25
26 26 root_dir = Unicode(getcwd(), config=True)
27 27
28 28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
29 29 def _save_script_changed(self):
30 30 self.log.warn("""
31 31 Automatically saving notebooks as scripts has been removed.
32 32 Use `ipython nbconvert --to python [notebook]` instead.
33 33 """)
34 34
35 35 def _root_dir_changed(self, name, old, new):
36 36 """Do a bit of validation of the root_dir."""
37 37 if not os.path.isabs(new):
38 38 # If we receive a non-absolute path, make it absolute.
39 39 self.root_dir = os.path.abspath(new)
40 40 return
41 41 if not os.path.isdir(new):
42 42 raise TraitError("%r is not a directory" % new)
43 43
44 44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
45 45 help="""The directory name in which to keep file checkpoints
46 46
47 47 This is a path relative to the file's own directory.
48 48
49 49 By default, it is .ipynb_checkpoints
50 50 """
51 51 )
52 52
53 53 def _copy(self, src, dest):
54 54 """copy src to dest
55 55
56 56 like shutil.copy2, but log errors in copystat
57 57 """
58 58 shutil.copyfile(src, dest)
59 59 try:
60 60 shutil.copystat(src, dest)
61 61 except OSError as e:
62 62 self.log.debug("copystat on %s failed", dest, exc_info=True)
63 63
64 64 def _get_os_path(self, name=None, path=''):
65 65 """Given a filename and API path, return its file system
66 66 path.
67 67
68 68 Parameters
69 69 ----------
70 70 name : string
71 71 A filename
72 72 path : string
73 73 The relative API path to the named file.
74 74
75 75 Returns
76 76 -------
77 77 path : string
78 78 API path to be evaluated relative to root_dir.
79 79 """
80 80 if name is not None:
81 81 path = url_path_join(path, name)
82 82 return to_os_path(path, self.root_dir)
83 83
84 84 def path_exists(self, path):
85 85 """Does the API-style path refer to an extant directory?
86 86
87 87 API-style wrapper for os.path.isdir
88 88
89 89 Parameters
90 90 ----------
91 91 path : string
92 92 The path to check. This is an API path (`/` separated,
93 93 relative to root_dir).
94 94
95 95 Returns
96 96 -------
97 97 exists : bool
98 98 Whether the path is indeed a directory.
99 99 """
100 100 path = path.strip('/')
101 101 os_path = self._get_os_path(path=path)
102 102 return os.path.isdir(os_path)
103 103
104 104 def is_hidden(self, path):
105 105 """Does the API style path correspond to a hidden directory or file?
106 106
107 107 Parameters
108 108 ----------
109 109 path : string
110 110 The path to check. This is an API path (`/` separated,
111 111 relative to root_dir).
112 112
113 113 Returns
114 114 -------
115 115 exists : bool
116 116 Whether the path is hidden.
117 117
118 118 """
119 119 path = path.strip('/')
120 120 os_path = self._get_os_path(path=path)
121 121 return is_hidden(os_path, self.root_dir)
122 122
123 123 def file_exists(self, name, path=''):
124 124 """Returns True if the file exists, else returns False.
125 125
126 126 API-style wrapper for os.path.isfile
127 127
128 128 Parameters
129 129 ----------
130 130 name : string
131 131 The name of the file you are checking.
132 132 path : string
133 133 The relative path to the file's directory (with '/' as separator)
134 134
135 135 Returns
136 136 -------
137 137 exists : bool
138 138 Whether the file exists.
139 139 """
140 140 path = path.strip('/')
141 141 nbpath = self._get_os_path(name, path=path)
142 142 return os.path.isfile(nbpath)
143 143
144 144 def exists(self, name=None, path=''):
145 145 """Returns True if the path [and name] exists, else returns False.
146 146
147 147 API-style wrapper for os.path.exists
148 148
149 149 Parameters
150 150 ----------
151 151 name : string
152 152 The name of the file you are checking.
153 153 path : string
154 154 The relative path to the file's directory (with '/' as separator)
155 155
156 156 Returns
157 157 -------
158 158 exists : bool
159 159 Whether the target exists.
160 160 """
161 161 path = path.strip('/')
162 162 os_path = self._get_os_path(name, path=path)
163 163 return os.path.exists(os_path)
164 164
165 165 def _base_model(self, name, path=''):
166 166 """Build the common base of a contents model"""
167 167 os_path = self._get_os_path(name, path)
168 168 info = os.stat(os_path)
169 169 last_modified = tz.utcfromtimestamp(info.st_mtime)
170 170 created = tz.utcfromtimestamp(info.st_ctime)
171 171 # Create the base model.
172 172 model = {}
173 173 model['name'] = name
174 174 model['path'] = path
175 175 model['last_modified'] = last_modified
176 176 model['created'] = created
177 177 model['content'] = None
178 178 model['format'] = None
179 179 model['message'] = None
180 180 return model
181 181
182 182 def _dir_model(self, name, path='', content=True):
183 183 """Build a model for a directory
184 184
185 185 if content is requested, will include a listing of the directory
186 186 """
187 187 os_path = self._get_os_path(name, path)
188 188
189 189 four_o_four = u'directory does not exist: %r' % os_path
190 190
191 191 if not os.path.isdir(os_path):
192 192 raise web.HTTPError(404, four_o_four)
193 193 elif is_hidden(os_path, self.root_dir):
194 194 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
195 195 os_path
196 196 )
197 197 raise web.HTTPError(404, four_o_four)
198 198
199 199 if name is None:
200 200 if '/' in path:
201 201 path, name = path.rsplit('/', 1)
202 202 else:
203 203 name = ''
204 204 model = self._base_model(name, path)
205 205 model['type'] = 'directory'
206 206 dir_path = u'{}/{}'.format(path, name)
207 207 if content:
208 208 model['content'] = contents = []
209 209 for os_path in glob.glob(self._get_os_path('*', dir_path)):
210 210 name = os.path.basename(os_path)
211 211 # skip over broken symlinks in listing
212 212 if not os.path.exists(os_path):
213 213 self.log.warn("%s doesn't exist", os_path)
214 214 continue
215 215 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
216 216 contents.append(self.get_model(name=name, path=dir_path, content=False))
217 217
218 218 model['format'] = 'json'
219 219
220 220 return model
221 221
222 222 def _file_model(self, name, path='', content=True):
223 223 """Build a model for a file
224 224
225 225 if content is requested, include the file contents.
226 226 UTF-8 text files will be unicode, binary files will be base64-encoded.
227 227 """
228 228 model = self._base_model(name, path)
229 229 model['type'] = 'file'
230 230 if content:
231 231 os_path = self._get_os_path(name, path)
232 232 with io.open(os_path, 'rb') as f:
233 233 bcontent = f.read()
234 234 try:
235 235 model['content'] = bcontent.decode('utf8')
236 236 except UnicodeError as e:
237 237 model['content'] = base64.encodestring(bcontent).decode('ascii')
238 238 model['format'] = 'base64'
239 239 else:
240 240 model['format'] = 'text'
241 241 return model
242 242
243 243
244 244 def _notebook_model(self, name, path='', content=True):
245 245 """Build a notebook model
246 246
247 247 if content is requested, the notebook content will be populated
248 248 as a JSON structure (not double-serialized)
249 249 """
250 250 model = self._base_model(name, path)
251 251 model['type'] = 'notebook'
252 252 if content:
253 253 os_path = self._get_os_path(name, path)
254 254 with io.open(os_path, 'r', encoding='utf-8') as f:
255 255 try:
256 nb = current.read(f, u'json')
256 nb = nbformat.read(f, as_version=4)
257 257 except Exception as e:
258 258 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
259 259 self.mark_trusted_cells(nb, name, path)
260 260 model['content'] = nb
261 261 model['format'] = 'json'
262 262 self.validate_notebook_model(model)
263 263 return model
264 264
265 265 def get_model(self, name, path='', content=True):
266 266 """ Takes a path and name for an entity and returns its model
267 267
268 268 Parameters
269 269 ----------
270 270 name : str
271 271 the name of the target
272 272 path : str
273 273 the API path that describes the relative path for the target
274 274
275 275 Returns
276 276 -------
277 277 model : dict
278 278 the contents model. If content=True, returns the contents
279 279 of the file or directory as well.
280 280 """
281 281 path = path.strip('/')
282 282
283 283 if not self.exists(name=name, path=path):
284 284 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
285 285
286 286 os_path = self._get_os_path(name, path)
287 287 if os.path.isdir(os_path):
288 288 model = self._dir_model(name, path, content)
289 289 elif name.endswith('.ipynb'):
290 290 model = self._notebook_model(name, path, content)
291 291 else:
292 292 model = self._file_model(name, path, content)
293 293 return model
294 294
295 295 def _save_notebook(self, os_path, model, name='', path=''):
296 296 """save a notebook file"""
297 297 # Save the notebook file
298 nb = current.from_dict(model['content'])
298 nb = nbformat.from_dict(model['content'])
299 299
300 300 self.check_and_sign(nb, name, path)
301 301
302 302 if 'name' in nb['metadata']:
303 303 nb['metadata']['name'] = u''
304 304
305 305 with atomic_writing(os_path, encoding='utf-8') as f:
306 current.write(nb, f, version=nb.nbformat)
306 nbformat.write(f, nb, version=nbformat.NO_CONVERT)
307 307
308 308 def _save_file(self, os_path, model, name='', path=''):
309 309 """save a non-notebook file"""
310 310 fmt = model.get('format', None)
311 311 if fmt not in {'text', 'base64'}:
312 312 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
313 313 try:
314 314 content = model['content']
315 315 if fmt == 'text':
316 316 bcontent = content.encode('utf8')
317 317 else:
318 318 b64_bytes = content.encode('ascii')
319 319 bcontent = base64.decodestring(b64_bytes)
320 320 except Exception as e:
321 321 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
322 322 with atomic_writing(os_path, text=False) as f:
323 323 f.write(bcontent)
324 324
325 325 def _save_directory(self, os_path, model, name='', path=''):
326 326 """create a directory"""
327 327 if is_hidden(os_path, self.root_dir):
328 328 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
329 329 if not os.path.exists(os_path):
330 330 os.mkdir(os_path)
331 331 elif not os.path.isdir(os_path):
332 332 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
333 333 else:
334 334 self.log.debug("Directory %r already exists", os_path)
335 335
336 336 def save(self, model, name='', path=''):
337 337 """Save the file model and return the model with no content."""
338 338 path = path.strip('/')
339 339
340 340 if 'type' not in model:
341 341 raise web.HTTPError(400, u'No file type provided')
342 342 if 'content' not in model and model['type'] != 'directory':
343 343 raise web.HTTPError(400, u'No file content provided')
344 344
345 345 # One checkpoint should always exist
346 346 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
347 347 self.create_checkpoint(name, path)
348 348
349 349 new_path = model.get('path', path).strip('/')
350 350 new_name = model.get('name', name)
351 351
352 352 if path != new_path or name != new_name:
353 353 self.rename(name, path, new_name, new_path)
354 354
355 355 os_path = self._get_os_path(new_name, new_path)
356 356 self.log.debug("Saving %s", os_path)
357 357 try:
358 358 if model['type'] == 'notebook':
359 359 self._save_notebook(os_path, model, new_name, new_path)
360 360 elif model['type'] == 'file':
361 361 self._save_file(os_path, model, new_name, new_path)
362 362 elif model['type'] == 'directory':
363 363 self._save_directory(os_path, model, new_name, new_path)
364 364 else:
365 365 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
366 366 except web.HTTPError:
367 367 raise
368 368 except Exception as e:
369 369 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
370 370
371 371 validation_message = None
372 372 if model['type'] == 'notebook':
373 373 self.validate_notebook_model(model)
374 374 validation_message = model.get('message', None)
375 375
376 376 model = self.get_model(new_name, new_path, content=False)
377 377 if validation_message:
378 378 model['message'] = validation_message
379 379 return model
380 380
381 381 def update(self, model, name, path=''):
382 382 """Update the file's path and/or name
383 383
384 384 For use in PATCH requests, to enable renaming a file without
385 385 re-uploading its contents. Only used for renaming at the moment.
386 386 """
387 387 path = path.strip('/')
388 388 new_name = model.get('name', name)
389 389 new_path = model.get('path', path).strip('/')
390 390 if path != new_path or name != new_name:
391 391 self.rename(name, path, new_name, new_path)
392 392 model = self.get_model(new_name, new_path, content=False)
393 393 return model
394 394
395 395 def delete(self, name, path=''):
396 396 """Delete file by name and path."""
397 397 path = path.strip('/')
398 398 os_path = self._get_os_path(name, path)
399 399 rm = os.unlink
400 400 if os.path.isdir(os_path):
401 401 listing = os.listdir(os_path)
402 402 # don't delete non-empty directories (checkpoints dir doesn't count)
403 403 if listing and listing != [self.checkpoint_dir]:
404 404 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
405 405 elif not os.path.isfile(os_path):
406 406 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
407 407
408 408 # clear checkpoints
409 409 for checkpoint in self.list_checkpoints(name, path):
410 410 checkpoint_id = checkpoint['id']
411 411 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
412 412 if os.path.isfile(cp_path):
413 413 self.log.debug("Unlinking checkpoint %s", cp_path)
414 414 os.unlink(cp_path)
415 415
416 416 if os.path.isdir(os_path):
417 417 self.log.debug("Removing directory %s", os_path)
418 418 shutil.rmtree(os_path)
419 419 else:
420 420 self.log.debug("Unlinking file %s", os_path)
421 421 rm(os_path)
422 422
423 423 def rename(self, old_name, old_path, new_name, new_path):
424 424 """Rename a file."""
425 425 old_path = old_path.strip('/')
426 426 new_path = new_path.strip('/')
427 427 if new_name == old_name and new_path == old_path:
428 428 return
429 429
430 430 new_os_path = self._get_os_path(new_name, new_path)
431 431 old_os_path = self._get_os_path(old_name, old_path)
432 432
433 433 # Should we proceed with the move?
434 434 if os.path.isfile(new_os_path):
435 435 raise web.HTTPError(409, u'File with name already exists: %s' % new_os_path)
436 436
437 437 # Move the file
438 438 try:
439 439 shutil.move(old_os_path, new_os_path)
440 440 except Exception as e:
441 441 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
442 442
443 443 # Move the checkpoints
444 444 old_checkpoints = self.list_checkpoints(old_name, old_path)
445 445 for cp in old_checkpoints:
446 446 checkpoint_id = cp['id']
447 447 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
448 448 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
449 449 if os.path.isfile(old_cp_path):
450 450 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
451 451 shutil.move(old_cp_path, new_cp_path)
452 452
453 453 # Checkpoint-related utilities
454 454
455 455 def get_checkpoint_path(self, checkpoint_id, name, path=''):
456 456 """find the path to a checkpoint"""
457 457 path = path.strip('/')
458 458 basename, ext = os.path.splitext(name)
459 459 filename = u"{name}-{checkpoint_id}{ext}".format(
460 460 name=basename,
461 461 checkpoint_id=checkpoint_id,
462 462 ext=ext,
463 463 )
464 464 os_path = self._get_os_path(path=path)
465 465 cp_dir = os.path.join(os_path, self.checkpoint_dir)
466 466 ensure_dir_exists(cp_dir)
467 467 cp_path = os.path.join(cp_dir, filename)
468 468 return cp_path
469 469
470 470 def get_checkpoint_model(self, checkpoint_id, name, path=''):
471 471 """construct the info dict for a given checkpoint"""
472 472 path = path.strip('/')
473 473 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
474 474 stats = os.stat(cp_path)
475 475 last_modified = tz.utcfromtimestamp(stats.st_mtime)
476 476 info = dict(
477 477 id = checkpoint_id,
478 478 last_modified = last_modified,
479 479 )
480 480 return info
481 481
482 482 # public checkpoint API
483 483
484 484 def create_checkpoint(self, name, path=''):
485 485 """Create a checkpoint from the current state of a file"""
486 486 path = path.strip('/')
487 487 src_path = self._get_os_path(name, path)
488 488 # only the one checkpoint ID:
489 489 checkpoint_id = u"checkpoint"
490 490 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
491 491 self.log.debug("creating checkpoint for %s", name)
492 492 self._copy(src_path, cp_path)
493 493
494 494 # return the checkpoint info
495 495 return self.get_checkpoint_model(checkpoint_id, name, path)
496 496
497 497 def list_checkpoints(self, name, path=''):
498 498 """list the checkpoints for a given file
499 499
500 500 This contents manager currently only supports one checkpoint per file.
501 501 """
502 502 path = path.strip('/')
503 503 checkpoint_id = "checkpoint"
504 504 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
505 505 if not os.path.exists(os_path):
506 506 return []
507 507 else:
508 508 return [self.get_checkpoint_model(checkpoint_id, name, path)]
509 509
510 510
511 511 def restore_checkpoint(self, checkpoint_id, name, path=''):
512 512 """restore a file to a checkpointed state"""
513 513 path = path.strip('/')
514 514 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
515 515 nb_path = self._get_os_path(name, path)
516 516 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
517 517 if not os.path.isfile(cp_path):
518 518 self.log.debug("checkpoint file does not exist: %s", cp_path)
519 519 raise web.HTTPError(404,
520 520 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
521 521 )
522 522 # ensure notebook is readable (never restore from an unreadable notebook)
523 523 if cp_path.endswith('.ipynb'):
524 524 with io.open(cp_path, 'r', encoding='utf-8') as f:
525 current.read(f, u'json')
525 nbformat.read(f, as_version=4)
526 526 self._copy(cp_path, nb_path)
527 527 self.log.debug("copying %s -> %s", cp_path, nb_path)
528 528
529 529 def delete_checkpoint(self, checkpoint_id, name, path=''):
530 530 """delete a file's checkpoint"""
531 531 path = path.strip('/')
532 532 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
533 533 if not os.path.isfile(cp_path):
534 534 raise web.HTTPError(404,
535 535 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
536 536 )
537 537 self.log.debug("unlinking %s", cp_path)
538 538 os.unlink(cp_path)
539 539
540 540 def info_string(self):
541 541 return "Serving notebooks from local directory: %s" % self.root_dir
542 542
543 543 def get_kernel_path(self, name, path='', model=None):
544 544 """Return the initial working dir a kernel associated with a given notebook"""
545 545 return os.path.join(self.root_dir, path)
@@ -1,345 +1,347
1 1 """A base class for contents managers."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from fnmatch import fnmatch
7 7 import itertools
8 8 import json
9 9 import os
10 10
11 11 from tornado.web import HTTPError
12 12
13 13 from IPython.config.configurable import LoggingConfigurable
14 from IPython.nbformat import current, sign
14 from IPython.nbformat import sign, validate, ValidationError
15 from IPython.nbformat.v4 import new_notebook
15 16 from IPython.utils.traitlets import Instance, Unicode, List
16 17
17 18
18 19 class ContentsManager(LoggingConfigurable):
19 20 """Base class for serving files and directories.
20 21
21 22 This serves any text or binary file,
22 23 as well as directories,
23 24 with special handling for JSON notebook documents.
24 25
25 26 Most APIs take a path argument,
26 27 which is always an API-style unicode path,
27 28 and always refers to a directory.
28 29
29 30 - unicode, not url-escaped
30 31 - '/'-separated
31 32 - leading and trailing '/' will be stripped
32 33 - if unspecified, path defaults to '',
33 34 indicating the root path.
34 35
35 36 name is also unicode, and refers to a specfic target:
36 37
37 38 - unicode, not url-escaped
38 39 - must not contain '/'
39 40 - It refers to an individual filename
40 41 - It may refer to a directory name,
41 42 in the case of listing or creating directories.
42 43
43 44 """
44 45
45 46 notary = Instance(sign.NotebookNotary)
46 47 def _notary_default(self):
47 48 return sign.NotebookNotary(parent=self)
48 49
49 50 hide_globs = List(Unicode, [
50 51 u'__pycache__', '*.pyc', '*.pyo',
51 52 '.DS_Store', '*.so', '*.dylib', '*~',
52 53 ], config=True, help="""
53 54 Glob patterns to hide in file and directory listings.
54 55 """)
55 56
56 57 untitled_notebook = Unicode("Untitled", config=True,
57 58 help="The base name used when creating untitled notebooks."
58 59 )
59 60
60 61 untitled_file = Unicode("untitled", config=True,
61 62 help="The base name used when creating untitled files."
62 63 )
63 64
64 65 untitled_directory = Unicode("Untitled Folder", config=True,
65 66 help="The base name used when creating untitled directories."
66 67 )
67 68
68 69 # ContentsManager API part 1: methods that must be
69 70 # implemented in subclasses.
70 71
71 72 def path_exists(self, path):
72 73 """Does the API-style path (directory) actually exist?
73 74
74 75 Like os.path.isdir
75 76
76 77 Override this method in subclasses.
77 78
78 79 Parameters
79 80 ----------
80 81 path : string
81 82 The path to check
82 83
83 84 Returns
84 85 -------
85 86 exists : bool
86 87 Whether the path does indeed exist.
87 88 """
88 89 raise NotImplementedError
89 90
90 91 def is_hidden(self, path):
91 92 """Does the API style path correspond to a hidden directory or file?
92 93
93 94 Parameters
94 95 ----------
95 96 path : string
96 97 The path to check. This is an API path (`/` separated,
97 98 relative to root dir).
98 99
99 100 Returns
100 101 -------
101 102 hidden : bool
102 103 Whether the path is hidden.
103 104
104 105 """
105 106 raise NotImplementedError
106 107
107 108 def file_exists(self, name, path=''):
108 109 """Does a file exist at the given name and path?
109 110
110 111 Like os.path.isfile
111 112
112 113 Override this method in subclasses.
113 114
114 115 Parameters
115 116 ----------
116 117 name : string
117 118 The name of the file you are checking.
118 119 path : string
119 120 The relative path to the file's directory (with '/' as separator)
120 121
121 122 Returns
122 123 -------
123 124 exists : bool
124 125 Whether the file exists.
125 126 """
126 127 raise NotImplementedError('must be implemented in a subclass')
127 128
128 129 def exists(self, name, path=''):
129 130 """Does a file or directory exist at the given name and path?
130 131
131 132 Like os.path.exists
132 133
133 134 Parameters
134 135 ----------
135 136 name : string
136 137 The name of the file you are checking.
137 138 path : string
138 139 The relative path to the file's directory (with '/' as separator)
139 140
140 141 Returns
141 142 -------
142 143 exists : bool
143 144 Whether the target exists.
144 145 """
145 146 return self.file_exists(name, path) or self.path_exists("%s/%s" % (path, name))
146 147
147 148 def get_model(self, name, path='', content=True):
148 149 """Get the model of a file or directory with or without content."""
149 150 raise NotImplementedError('must be implemented in a subclass')
150 151
151 152 def save(self, model, name, path=''):
152 153 """Save the file or directory and return the model with no content."""
153 154 raise NotImplementedError('must be implemented in a subclass')
154 155
155 156 def update(self, model, name, path=''):
156 157 """Update the file or directory and return the model with no content.
157 158
158 159 For use in PATCH requests, to enable renaming a file without
159 160 re-uploading its contents. Only used for renaming at the moment.
160 161 """
161 162 raise NotImplementedError('must be implemented in a subclass')
162 163
163 164 def delete(self, name, path=''):
164 165 """Delete file or directory by name and path."""
165 166 raise NotImplementedError('must be implemented in a subclass')
166 167
167 168 def create_checkpoint(self, name, path=''):
168 169 """Create a checkpoint of the current state of a file
169 170
170 171 Returns a checkpoint_id for the new checkpoint.
171 172 """
172 173 raise NotImplementedError("must be implemented in a subclass")
173 174
174 175 def list_checkpoints(self, name, path=''):
175 176 """Return a list of checkpoints for a given file"""
176 177 return []
177 178
178 179 def restore_checkpoint(self, checkpoint_id, name, path=''):
179 180 """Restore a file from one of its checkpoints"""
180 181 raise NotImplementedError("must be implemented in a subclass")
181 182
182 183 def delete_checkpoint(self, checkpoint_id, name, path=''):
183 184 """delete a checkpoint for a file"""
184 185 raise NotImplementedError("must be implemented in a subclass")
185 186
186 187 # ContentsManager API part 2: methods that have useable default
187 188 # implementations, but can be overridden in subclasses.
188 189
189 190 def info_string(self):
190 191 return "Serving contents"
191 192
192 193 def get_kernel_path(self, name, path='', model=None):
193 194 """ Return the path to start kernel in """
194 195 return path
195 196
196 197 def increment_filename(self, filename, path=''):
197 198 """Increment a filename until it is unique.
198 199
199 200 Parameters
200 201 ----------
201 202 filename : unicode
202 203 The name of a file, including extension
203 204 path : unicode
204 205 The API path of the target's directory
205 206
206 207 Returns
207 208 -------
208 209 name : unicode
209 210 A filename that is unique, based on the input filename.
210 211 """
211 212 path = path.strip('/')
212 213 basename, ext = os.path.splitext(filename)
213 214 for i in itertools.count():
214 215 name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
215 216 ext=ext)
216 217 if not self.file_exists(name, path):
217 218 break
218 219 return name
219 220
220 221 def validate_notebook_model(self, model):
221 222 """Add failed-validation message to model"""
222 223 try:
223 current.validate(model['content'])
224 except current.ValidationError as e:
224 validate(model['content'])
225 except ValidationError as e:
225 226 model['message'] = 'Notebook Validation failed: {}:\n{}'.format(
226 227 e.message, json.dumps(e.instance, indent=1, default=lambda obj: '<UNKNOWN>'),
227 228 )
228 229 return model
229 230
230 231 def create_file(self, model=None, path='', ext='.ipynb'):
231 232 """Create a new file or directory and return its model with no content."""
232 233 path = path.strip('/')
233 234 if model is None:
234 235 model = {}
235 236 if 'content' not in model and model.get('type', None) != 'directory':
236 237 if ext == '.ipynb':
237 model['content'] = current.new_notebook()
238 model['content'] = new_notebook()
238 239 model['type'] = 'notebook'
239 240 model['format'] = 'json'
240 241 else:
241 242 model['content'] = ''
242 243 model['type'] = 'file'
243 244 model['format'] = 'text'
244 245 if 'name' not in model:
245 246 if model['type'] == 'directory':
246 247 untitled = self.untitled_directory
247 248 elif model['type'] == 'notebook':
248 249 untitled = self.untitled_notebook
249 250 elif model['type'] == 'file':
250 251 untitled = self.untitled_file
251 252 else:
252 253 raise HTTPError(400, "Unexpected model type: %r" % model['type'])
253 254 model['name'] = self.increment_filename(untitled + ext, path)
254 255
255 256 model['path'] = path
256 257 model = self.save(model, model['name'], model['path'])
257 258 return model
258 259
259 260 def copy(self, from_name, to_name=None, path=''):
260 261 """Copy an existing file and return its new model.
261 262
262 263 If to_name not specified, increment `from_name-Copy#.ext`.
263 264
264 265 copy_from can be a full path to a file,
265 266 or just a base name. If a base name, `path` is used.
266 267 """
267 268 path = path.strip('/')
268 269 if '/' in from_name:
269 270 from_path, from_name = from_name.rsplit('/', 1)
270 271 else:
271 272 from_path = path
272 273 model = self.get_model(from_name, from_path)
273 274 if model['type'] == 'directory':
274 275 raise HTTPError(400, "Can't copy directories")
275 276 if not to_name:
276 277 base, ext = os.path.splitext(from_name)
277 278 copy_name = u'{0}-Copy{1}'.format(base, ext)
278 279 to_name = self.increment_filename(copy_name, path)
279 280 model['name'] = to_name
280 281 model['path'] = path
281 282 model = self.save(model, to_name, path)
282 283 return model
283 284
284 285 def log_info(self):
285 286 self.log.info(self.info_string())
286 287
287 288 def trust_notebook(self, name, path=''):
288 289 """Explicitly trust a notebook
289 290
290 291 Parameters
291 292 ----------
292 293 name : string
293 294 The filename of the notebook
294 295 path : string
295 296 The notebook's directory
296 297 """
297 298 model = self.get_model(name, path)
298 299 nb = model['content']
299 300 self.log.warn("Trusting notebook %s/%s", path, name)
300 301 self.notary.mark_cells(nb, True)
301 302 self.save(model, name, path)
302 303
303 304 def check_and_sign(self, nb, name='', path=''):
304 305 """Check for trusted cells, and sign the notebook.
305 306
306 307 Called as a part of saving notebooks.
307 308
308 309 Parameters
309 310 ----------
310 311 nb : dict
311 The notebook object (in nbformat.current format)
312 The notebook object (in current nbformat)
312 313 name : string
313 314 The filename of the notebook (for logging)
314 315 path : string
315 316 The notebook's directory (for logging)
316 317 """
317 if nb['nbformat'] != current.nbformat:
318 # can't sign old notebooks
319 if nb['nbformat'] != 4:
318 320 return
319 321 if self.notary.check_cells(nb):
320 322 self.notary.sign(nb)
321 323 else:
322 324 self.log.warn("Saving untrusted notebook %s/%s", path, name)
323 325
324 326 def mark_trusted_cells(self, nb, name='', path=''):
325 327 """Mark cells as trusted if the notebook signature matches.
326 328
327 329 Called as a part of loading notebooks.
328 330
329 331 Parameters
330 332 ----------
331 333 nb : dict
332 The notebook object (in nbformat.current format)
334 The notebook object (in current nbformat)
333 335 name : string
334 336 The filename of the notebook (for logging)
335 337 path : string
336 338 The notebook's directory (for logging)
337 339 """
338 340 trusted = self.notary.check_signature(nb)
339 341 if not trusted:
340 342 self.log.warn("Notebook %s/%s is not trusted", path, name)
341 343 self.notary.mark_cells(nb, trusted)
342 344
343 345 def should_list(self, name):
344 346 """Should this file/directory name be displayed in a listing?"""
345 347 return not any(fnmatch(name, glob) for glob in self.hide_globs)
@@ -1,480 +1,481
1 1 # coding: utf-8
2 2 """Test the contents webservice API."""
3 3
4 4 import base64
5 5 import io
6 6 import json
7 7 import os
8 8 import shutil
9 9 from unicodedata import normalize
10 10
11 11 pjoin = os.path.join
12 12
13 13 import requests
14 14
15 15 from IPython.html.utils import url_path_join, url_escape
16 16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import current
18 from IPython.nbformat.current import (new_notebook, write, read,
19 new_markdown_cell, from_dict)
17 from IPython.nbformat import read, write, from_dict
18 from IPython.nbformat.v4 import (
19 new_notebook, new_markdown_cell,
20 )
20 21 from IPython.nbformat import v2
21 22 from IPython.utils import py3compat
22 23 from IPython.utils.data import uniq_stable
23 24
24 25
25 26 def notebooks_only(dir_model):
26 27 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27 28
28 29 def dirs_only(dir_model):
29 30 return [x for x in dir_model['content'] if x['type']=='directory']
30 31
31 32
32 33 class API(object):
33 34 """Wrapper for contents API calls."""
34 35 def __init__(self, base_url):
35 36 self.base_url = base_url
36 37
37 38 def _req(self, verb, path, body=None):
38 39 response = requests.request(verb,
39 40 url_path_join(self.base_url, 'api/contents', path),
40 41 data=body,
41 42 )
42 43 response.raise_for_status()
43 44 return response
44 45
45 46 def list(self, path='/'):
46 47 return self._req('GET', path)
47 48
48 49 def read(self, name, path='/'):
49 50 return self._req('GET', url_path_join(path, name))
50 51
51 52 def create_untitled(self, path='/', ext=None):
52 53 body = None
53 54 if ext:
54 55 body = json.dumps({'ext': ext})
55 56 return self._req('POST', path, body)
56 57
57 58 def upload_untitled(self, body, path='/'):
58 59 return self._req('POST', path, body)
59 60
60 61 def copy_untitled(self, copy_from, path='/'):
61 62 body = json.dumps({'copy_from':copy_from})
62 63 return self._req('POST', path, body)
63 64
64 65 def create(self, name, path='/'):
65 66 return self._req('PUT', url_path_join(path, name))
66 67
67 68 def upload(self, name, body, path='/'):
68 69 return self._req('PUT', url_path_join(path, name), body)
69 70
70 71 def mkdir(self, name, path='/'):
71 72 return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
72 73
73 74 def copy(self, copy_from, copy_to, path='/'):
74 75 body = json.dumps({'copy_from':copy_from})
75 76 return self._req('PUT', url_path_join(path, copy_to), body)
76 77
77 78 def save(self, name, body, path='/'):
78 79 return self._req('PUT', url_path_join(path, name), body)
79 80
80 81 def delete(self, name, path='/'):
81 82 return self._req('DELETE', url_path_join(path, name))
82 83
83 84 def rename(self, name, path, new_name):
84 85 body = json.dumps({'name': new_name})
85 86 return self._req('PATCH', url_path_join(path, name), body)
86 87
87 88 def get_checkpoints(self, name, path):
88 89 return self._req('GET', url_path_join(path, name, 'checkpoints'))
89 90
90 91 def new_checkpoint(self, name, path):
91 92 return self._req('POST', url_path_join(path, name, 'checkpoints'))
92 93
93 94 def restore_checkpoint(self, name, path, checkpoint_id):
94 95 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
95 96
96 97 def delete_checkpoint(self, name, path, checkpoint_id):
97 98 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
98 99
99 100 class APITest(NotebookTestBase):
100 101 """Test the kernels web service API"""
101 102 dirs_nbs = [('', 'inroot'),
102 103 ('Directory with spaces in', 'inspace'),
103 104 (u'unicodé', 'innonascii'),
104 105 ('foo', 'a'),
105 106 ('foo', 'b'),
106 107 ('foo', 'name with spaces'),
107 108 ('foo', u'unicodé'),
108 109 ('foo/bar', 'baz'),
109 110 ('ordering', 'A'),
110 111 ('ordering', 'b'),
111 112 ('ordering', 'C'),
112 113 (u'å b', u'ç d'),
113 114 ]
114 115 hidden_dirs = ['.hidden', '__pycache__']
115 116
116 117 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
117 118 del dirs[0] # remove ''
118 119 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
119 120
120 121 @staticmethod
121 122 def _blob_for_name(name):
122 123 return name.encode('utf-8') + b'\xFF'
123 124
124 125 @staticmethod
125 126 def _txt_for_name(name):
126 127 return u'%s text file' % name
127 128
128 129 def setUp(self):
129 130 nbdir = self.notebook_dir.name
130 131 self.blob = os.urandom(100)
131 132 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
132 133
133 134
134 135
135 136 for d in (self.dirs + self.hidden_dirs):
136 137 d.replace('/', os.sep)
137 138 if not os.path.isdir(pjoin(nbdir, d)):
138 139 os.mkdir(pjoin(nbdir, d))
139 140
140 141 for d, name in self.dirs_nbs:
141 142 d = d.replace('/', os.sep)
142 143 # create a notebook
143 144 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
144 145 encoding='utf-8') as f:
145 146 nb = new_notebook()
146 write(nb, f, format='ipynb')
147 write(f, nb, version=4)
147 148
148 149 # create a text file
149 150 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
150 151 encoding='utf-8') as f:
151 152 f.write(self._txt_for_name(name))
152 153
153 154 # create a binary file
154 155 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
155 156 f.write(self._blob_for_name(name))
156 157
157 158 self.api = API(self.base_url())
158 159
159 160 def tearDown(self):
160 161 nbdir = self.notebook_dir.name
161 162
162 163 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
163 164 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
164 165
165 166 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
166 167 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
167 168
168 169 def test_list_notebooks(self):
169 170 nbs = notebooks_only(self.api.list().json())
170 171 self.assertEqual(len(nbs), 1)
171 172 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
172 173
173 174 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
174 175 self.assertEqual(len(nbs), 1)
175 176 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
176 177
177 178 nbs = notebooks_only(self.api.list(u'/unicodé/').json())
178 179 self.assertEqual(len(nbs), 1)
179 180 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
180 181 self.assertEqual(nbs[0]['path'], u'unicodé')
181 182
182 183 nbs = notebooks_only(self.api.list('/foo/bar/').json())
183 184 self.assertEqual(len(nbs), 1)
184 185 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
185 186 self.assertEqual(nbs[0]['path'], 'foo/bar')
186 187
187 188 nbs = notebooks_only(self.api.list('foo').json())
188 189 self.assertEqual(len(nbs), 4)
189 190 nbnames = { normalize('NFC', n['name']) for n in nbs }
190 191 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
191 192 expected = { normalize('NFC', name) for name in expected }
192 193 self.assertEqual(nbnames, expected)
193 194
194 195 nbs = notebooks_only(self.api.list('ordering').json())
195 196 nbnames = [n['name'] for n in nbs]
196 197 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
197 198 self.assertEqual(nbnames, expected)
198 199
199 200 def test_list_dirs(self):
200 201 dirs = dirs_only(self.api.list().json())
201 202 dir_names = {normalize('NFC', d['name']) for d in dirs}
202 203 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
203 204
204 205 def test_list_nonexistant_dir(self):
205 206 with assert_http_error(404):
206 207 self.api.list('nonexistant')
207 208
208 209 def test_get_nb_contents(self):
209 210 for d, name in self.dirs_nbs:
210 211 nb = self.api.read('%s.ipynb' % name, d+'/').json()
211 212 self.assertEqual(nb['name'], u'%s.ipynb' % name)
212 213 self.assertEqual(nb['type'], 'notebook')
213 214 self.assertIn('content', nb)
214 215 self.assertEqual(nb['format'], 'json')
215 216 self.assertIn('content', nb)
216 217 self.assertIn('metadata', nb['content'])
217 218 self.assertIsInstance(nb['content']['metadata'], dict)
218 219
219 220 def test_get_contents_no_such_file(self):
220 221 # Name that doesn't exist - should be a 404
221 222 with assert_http_error(404):
222 223 self.api.read('q.ipynb', 'foo')
223 224
224 225 def test_get_text_file_contents(self):
225 226 for d, name in self.dirs_nbs:
226 227 model = self.api.read(u'%s.txt' % name, d+'/').json()
227 228 self.assertEqual(model['name'], u'%s.txt' % name)
228 229 self.assertIn('content', model)
229 230 self.assertEqual(model['format'], 'text')
230 231 self.assertEqual(model['type'], 'file')
231 232 self.assertEqual(model['content'], self._txt_for_name(name))
232 233
233 234 # Name that doesn't exist - should be a 404
234 235 with assert_http_error(404):
235 236 self.api.read('q.txt', 'foo')
236 237
237 238 def test_get_binary_file_contents(self):
238 239 for d, name in self.dirs_nbs:
239 240 model = self.api.read(u'%s.blob' % name, d+'/').json()
240 241 self.assertEqual(model['name'], u'%s.blob' % name)
241 242 self.assertIn('content', model)
242 243 self.assertEqual(model['format'], 'base64')
243 244 self.assertEqual(model['type'], 'file')
244 245 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
245 246 self.assertEqual(model['content'], b64_data)
246 247
247 248 # Name that doesn't exist - should be a 404
248 249 with assert_http_error(404):
249 250 self.api.read('q.txt', 'foo')
250 251
251 252 def _check_created(self, resp, name, path, type='notebook'):
252 253 self.assertEqual(resp.status_code, 201)
253 254 location_header = py3compat.str_to_unicode(resp.headers['Location'])
254 255 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
255 256 rjson = resp.json()
256 257 self.assertEqual(rjson['name'], name)
257 258 self.assertEqual(rjson['path'], path)
258 259 self.assertEqual(rjson['type'], type)
259 260 isright = os.path.isdir if type == 'directory' else os.path.isfile
260 261 assert isright(pjoin(
261 262 self.notebook_dir.name,
262 263 path.replace('/', os.sep),
263 264 name,
264 265 ))
265 266
266 267 def test_create_untitled(self):
267 268 resp = self.api.create_untitled(path=u'å b')
268 269 self._check_created(resp, 'Untitled0.ipynb', u'å b')
269 270
270 271 # Second time
271 272 resp = self.api.create_untitled(path=u'å b')
272 273 self._check_created(resp, 'Untitled1.ipynb', u'å b')
273 274
274 275 # And two directories down
275 276 resp = self.api.create_untitled(path='foo/bar')
276 277 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
277 278
278 279 def test_create_untitled_txt(self):
279 280 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
280 281 self._check_created(resp, 'untitled0.txt', 'foo/bar', type='file')
281 282
282 283 resp = self.api.read(path='foo/bar', name='untitled0.txt')
283 284 model = resp.json()
284 285 self.assertEqual(model['type'], 'file')
285 286 self.assertEqual(model['format'], 'text')
286 287 self.assertEqual(model['content'], '')
287 288
288 289 def test_upload_untitled(self):
289 290 nb = new_notebook()
290 291 nbmodel = {'content': nb, 'type': 'notebook'}
291 292 resp = self.api.upload_untitled(path=u'å b',
292 293 body=json.dumps(nbmodel))
293 294 self._check_created(resp, 'Untitled0.ipynb', u'å b')
294 295
295 296 def test_upload(self):
296 297 nb = new_notebook()
297 298 nbmodel = {'content': nb, 'type': 'notebook'}
298 299 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
299 300 body=json.dumps(nbmodel))
300 301 self._check_created(resp, u'Upload tést.ipynb', u'å b')
301 302
302 303 def test_mkdir(self):
303 304 resp = self.api.mkdir(u'New ∂ir', path=u'å b')
304 305 self._check_created(resp, u'New ∂ir', u'å b', type='directory')
305 306
306 307 def test_mkdir_hidden_400(self):
307 308 with assert_http_error(400):
308 309 resp = self.api.mkdir(u'.hidden', path=u'å b')
309 310
310 311 def test_upload_txt(self):
311 312 body = u'ünicode téxt'
312 313 model = {
313 314 'content' : body,
314 315 'format' : 'text',
315 316 'type' : 'file',
316 317 }
317 318 resp = self.api.upload(u'Upload tést.txt', path=u'å b',
318 319 body=json.dumps(model))
319 320
320 321 # check roundtrip
321 322 resp = self.api.read(path=u'å b', name=u'Upload tést.txt')
322 323 model = resp.json()
323 324 self.assertEqual(model['type'], 'file')
324 325 self.assertEqual(model['format'], 'text')
325 326 self.assertEqual(model['content'], body)
326 327
327 328 def test_upload_b64(self):
328 329 body = b'\xFFblob'
329 330 b64body = base64.encodestring(body).decode('ascii')
330 331 model = {
331 332 'content' : b64body,
332 333 'format' : 'base64',
333 334 'type' : 'file',
334 335 }
335 336 resp = self.api.upload(u'Upload tést.blob', path=u'å b',
336 337 body=json.dumps(model))
337 338
338 339 # check roundtrip
339 340 resp = self.api.read(path=u'å b', name=u'Upload tést.blob')
340 341 model = resp.json()
341 342 self.assertEqual(model['type'], 'file')
342 343 self.assertEqual(model['format'], 'base64')
343 344 decoded = base64.decodestring(model['content'].encode('ascii'))
344 345 self.assertEqual(decoded, body)
345 346
346 347 def test_upload_v2(self):
347 348 nb = v2.new_notebook()
348 349 ws = v2.new_worksheet()
349 350 nb.worksheets.append(ws)
350 351 ws.cells.append(v2.new_code_cell(input='print("hi")'))
351 352 nbmodel = {'content': nb, 'type': 'notebook'}
352 353 resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
353 354 body=json.dumps(nbmodel))
354 355 self._check_created(resp, u'Upload tést.ipynb', u'å b')
355 356 resp = self.api.read(u'Upload tést.ipynb', u'å b')
356 357 data = resp.json()
357 self.assertEqual(data['content']['nbformat'], current.nbformat)
358 self.assertEqual(data['content']['nbformat'], 4)
358 359
359 360 def test_copy_untitled(self):
360 361 resp = self.api.copy_untitled(u'ç d.ipynb', path=u'å b')
361 362 self._check_created(resp, u'ç d-Copy0.ipynb', u'å b')
362 363
363 364 def test_copy(self):
364 365 resp = self.api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
365 366 self._check_created(resp, u'cøpy.ipynb', u'å b')
366 367
367 368 def test_copy_path(self):
368 369 resp = self.api.copy(u'foo/a.ipynb', u'cøpyfoo.ipynb', path=u'å b')
369 370 self._check_created(resp, u'cøpyfoo.ipynb', u'å b')
370 371
371 372 def test_copy_dir_400(self):
372 373 # can't copy directories
373 374 with assert_http_error(400):
374 375 resp = self.api.copy(u'å b', u'å c')
375 376
376 377 def test_delete(self):
377 378 for d, name in self.dirs_nbs:
378 379 resp = self.api.delete('%s.ipynb' % name, d)
379 380 self.assertEqual(resp.status_code, 204)
380 381
381 382 for d in self.dirs + ['/']:
382 383 nbs = notebooks_only(self.api.list(d).json())
383 384 self.assertEqual(len(nbs), 0)
384 385
385 386 def test_delete_dirs(self):
386 387 # depth-first delete everything, so we don't try to delete empty directories
387 388 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
388 389 listing = self.api.list(name).json()['content']
389 390 for model in listing:
390 391 self.api.delete(model['name'], model['path'])
391 392 listing = self.api.list('/').json()['content']
392 393 self.assertEqual(listing, [])
393 394
394 395 def test_delete_non_empty_dir(self):
395 396 """delete non-empty dir raises 400"""
396 397 with assert_http_error(400):
397 398 self.api.delete(u'å b')
398 399
399 400 def test_rename(self):
400 401 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
401 402 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
402 403 self.assertEqual(resp.json()['name'], 'z.ipynb')
403 404 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
404 405
405 406 nbs = notebooks_only(self.api.list('foo').json())
406 407 nbnames = set(n['name'] for n in nbs)
407 408 self.assertIn('z.ipynb', nbnames)
408 409 self.assertNotIn('a.ipynb', nbnames)
409 410
410 411 def test_rename_existing(self):
411 412 with assert_http_error(409):
412 413 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
413 414
414 415 def test_save(self):
415 416 resp = self.api.read('a.ipynb', 'foo')
416 417 nbcontent = json.loads(resp.text)['content']
417 418 nb = from_dict(nbcontent)
418 419 nb.cells.append(new_markdown_cell(u'Created by test ³'))
419 420
420 421 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
421 422 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
422 423
423 424 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
424 425 with io.open(nbfile, 'r', encoding='utf-8') as f:
425 newnb = read(f, format='ipynb')
426 newnb = read(f, as_version=4)
426 427 self.assertEqual(newnb.cells[0].source,
427 428 u'Created by test ³')
428 429 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
429 430 newnb = from_dict(nbcontent)
430 431 self.assertEqual(newnb.cells[0].source,
431 432 u'Created by test ³')
432 433
433 434 # Save and rename
434 435 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
435 436 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
436 437 saved = resp.json()
437 438 self.assertEqual(saved['name'], 'a2.ipynb')
438 439 self.assertEqual(saved['path'], 'foo/bar')
439 440 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
440 441 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
441 442 with assert_http_error(404):
442 443 self.api.read('a.ipynb', 'foo')
443 444
444 445 def test_checkpoints(self):
445 446 resp = self.api.read('a.ipynb', 'foo')
446 447 r = self.api.new_checkpoint('a.ipynb', 'foo')
447 448 self.assertEqual(r.status_code, 201)
448 449 cp1 = r.json()
449 450 self.assertEqual(set(cp1), {'id', 'last_modified'})
450 451 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
451 452
452 453 # Modify it
453 454 nbcontent = json.loads(resp.text)['content']
454 455 nb = from_dict(nbcontent)
455 456 hcell = new_markdown_cell('Created by test')
456 457 nb.cells.append(hcell)
457 458 # Save
458 459 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
459 460 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
460 461
461 462 # List checkpoints
462 463 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
463 464 self.assertEqual(cps, [cp1])
464 465
465 466 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
466 467 nb = from_dict(nbcontent)
467 468 self.assertEqual(nb.cells[0].source, 'Created by test')
468 469
469 470 # Restore cp1
470 471 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
471 472 self.assertEqual(r.status_code, 204)
472 473 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
473 474 nb = from_dict(nbcontent)
474 475 self.assertEqual(nb.cells, [])
475 476
476 477 # Delete cp1
477 478 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
478 479 self.assertEqual(r.status_code, 204)
479 480 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
480 481 self.assertEqual(cps, [])
@@ -1,332 +1,332
1 1 # coding: utf-8
2 2 """Tests for the notebook manager."""
3 3 from __future__ import print_function
4 4
5 5 import logging
6 6 import os
7 7
8 8 from tornado.web import HTTPError
9 9 from unittest import TestCase
10 10 from tempfile import NamedTemporaryFile
11 11
12 from IPython.nbformat import current
12 from IPython.nbformat import v4 as nbformat
13 13
14 14 from IPython.utils.tempdir import TemporaryDirectory
15 15 from IPython.utils.traitlets import TraitError
16 16 from IPython.html.utils import url_path_join
17 17 from IPython.testing import decorators as dec
18 18
19 19 from ..filemanager import FileContentsManager
20 20 from ..manager import ContentsManager
21 21
22 22
23 23 class TestFileContentsManager(TestCase):
24 24
25 25 def test_root_dir(self):
26 26 with TemporaryDirectory() as td:
27 27 fm = FileContentsManager(root_dir=td)
28 28 self.assertEqual(fm.root_dir, td)
29 29
30 30 def test_missing_root_dir(self):
31 31 with TemporaryDirectory() as td:
32 32 root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
33 33 self.assertRaises(TraitError, FileContentsManager, root_dir=root)
34 34
35 35 def test_invalid_root_dir(self):
36 36 with NamedTemporaryFile() as tf:
37 37 self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
38 38
39 39 def test_get_os_path(self):
40 40 # full filesystem path should be returned with correct operating system
41 41 # separators.
42 42 with TemporaryDirectory() as td:
43 43 root = td
44 44 fm = FileContentsManager(root_dir=root)
45 45 path = fm._get_os_path('test.ipynb', '/path/to/notebook/')
46 46 rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
47 47 fs_path = os.path.join(fm.root_dir, *rel_path_list)
48 48 self.assertEqual(path, fs_path)
49 49
50 50 fm = FileContentsManager(root_dir=root)
51 51 path = fm._get_os_path('test.ipynb')
52 52 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
53 53 self.assertEqual(path, fs_path)
54 54
55 55 fm = FileContentsManager(root_dir=root)
56 56 path = fm._get_os_path('test.ipynb', '////')
57 57 fs_path = os.path.join(fm.root_dir, 'test.ipynb')
58 58 self.assertEqual(path, fs_path)
59 59
60 60 def test_checkpoint_subdir(self):
61 61 subd = u'sub ∂ir'
62 62 cp_name = 'test-cp.ipynb'
63 63 with TemporaryDirectory() as td:
64 64 root = td
65 65 os.mkdir(os.path.join(td, subd))
66 66 fm = FileContentsManager(root_dir=root)
67 67 cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb', '/')
68 68 cp_subdir = fm.get_checkpoint_path('cp', 'test.ipynb', '/%s/' % subd)
69 69 self.assertNotEqual(cp_dir, cp_subdir)
70 70 self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
71 71 self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
72 72
73 73
74 74 class TestContentsManager(TestCase):
75 75
76 76 def setUp(self):
77 77 self._temp_dir = TemporaryDirectory()
78 78 self.td = self._temp_dir.name
79 79 self.contents_manager = FileContentsManager(
80 80 root_dir=self.td,
81 81 log=logging.getLogger()
82 82 )
83 83
84 84 def tearDown(self):
85 85 self._temp_dir.cleanup()
86 86
87 87 def make_dir(self, abs_path, rel_path):
88 88 """make subdirectory, rel_path is the relative path
89 89 to that directory from the location where the server started"""
90 90 os_path = os.path.join(abs_path, rel_path)
91 91 try:
92 92 os.makedirs(os_path)
93 93 except OSError:
94 94 print("Directory already exists: %r" % os_path)
95 95 return os_path
96 96
97 97 def add_code_cell(self, nb):
98 output = current.new_output("display_data", {'application/javascript': "alert('hi');"})
99 cell = current.new_code_cell("print('hi')", outputs=[output])
98 output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
99 cell = nbformat.new_code_cell("print('hi')", outputs=[output])
100 100 nb.cells.append(cell)
101 101
102 102 def new_notebook(self):
103 103 cm = self.contents_manager
104 104 model = cm.create_file()
105 105 name = model['name']
106 106 path = model['path']
107 107
108 108 full_model = cm.get_model(name, path)
109 109 nb = full_model['content']
110 110 self.add_code_cell(nb)
111 111
112 112 cm.save(full_model, name, path)
113 113 return nb, name, path
114 114
115 115 def test_create_file(self):
116 116 cm = self.contents_manager
117 117 # Test in root directory
118 118 model = cm.create_file()
119 119 assert isinstance(model, dict)
120 120 self.assertIn('name', model)
121 121 self.assertIn('path', model)
122 122 self.assertEqual(model['name'], 'Untitled0.ipynb')
123 123 self.assertEqual(model['path'], '')
124 124
125 125 # Test in sub-directory
126 126 sub_dir = '/foo/'
127 127 self.make_dir(cm.root_dir, 'foo')
128 128 model = cm.create_file(None, sub_dir)
129 129 assert isinstance(model, dict)
130 130 self.assertIn('name', model)
131 131 self.assertIn('path', model)
132 132 self.assertEqual(model['name'], 'Untitled0.ipynb')
133 133 self.assertEqual(model['path'], sub_dir.strip('/'))
134 134
135 135 def test_get(self):
136 136 cm = self.contents_manager
137 137 # Create a notebook
138 138 model = cm.create_file()
139 139 name = model['name']
140 140 path = model['path']
141 141
142 142 # Check that we 'get' on the notebook we just created
143 143 model2 = cm.get_model(name, path)
144 144 assert isinstance(model2, dict)
145 145 self.assertIn('name', model2)
146 146 self.assertIn('path', model2)
147 147 self.assertEqual(model['name'], name)
148 148 self.assertEqual(model['path'], path)
149 149
150 150 # Test in sub-directory
151 151 sub_dir = '/foo/'
152 152 self.make_dir(cm.root_dir, 'foo')
153 153 model = cm.create_file(None, sub_dir)
154 154 model2 = cm.get_model(name, sub_dir)
155 155 assert isinstance(model2, dict)
156 156 self.assertIn('name', model2)
157 157 self.assertIn('path', model2)
158 158 self.assertIn('content', model2)
159 159 self.assertEqual(model2['name'], 'Untitled0.ipynb')
160 160 self.assertEqual(model2['path'], sub_dir.strip('/'))
161 161
162 162 @dec.skip_win32
163 163 def test_bad_symlink(self):
164 164 cm = self.contents_manager
165 165 path = 'test bad symlink'
166 166 os_path = self.make_dir(cm.root_dir, path)
167 167
168 168 file_model = cm.create_file(path=path, ext='.txt')
169 169
170 170 # create a broken symlink
171 171 os.symlink("target", os.path.join(os_path, "bad symlink"))
172 172 model = cm.get_model(path)
173 173 self.assertEqual(model['content'], [file_model])
174 174
175 175 @dec.skip_win32
176 176 def test_good_symlink(self):
177 177 cm = self.contents_manager
178 178 path = 'test good symlink'
179 179 os_path = self.make_dir(cm.root_dir, path)
180 180
181 181 file_model = cm.create_file(path=path, ext='.txt')
182 182
183 183 # create a good symlink
184 184 os.symlink(file_model['name'], os.path.join(os_path, "good symlink"))
185 185 symlink_model = cm.get_model(name="good symlink", path=path, content=False)
186 186
187 187 dir_model = cm.get_model(path)
188 188 self.assertEqual(
189 189 sorted(dir_model['content'], key=lambda x: x['name']),
190 190 [symlink_model, file_model],
191 191 )
192 192
193 193 def test_update(self):
194 194 cm = self.contents_manager
195 195 # Create a notebook
196 196 model = cm.create_file()
197 197 name = model['name']
198 198 path = model['path']
199 199
200 200 # Change the name in the model for rename
201 201 model['name'] = 'test.ipynb'
202 202 model = cm.update(model, name, path)
203 203 assert isinstance(model, dict)
204 204 self.assertIn('name', model)
205 205 self.assertIn('path', model)
206 206 self.assertEqual(model['name'], 'test.ipynb')
207 207
208 208 # Make sure the old name is gone
209 209 self.assertRaises(HTTPError, cm.get_model, name, path)
210 210
211 211 # Test in sub-directory
212 212 # Create a directory and notebook in that directory
213 213 sub_dir = '/foo/'
214 214 self.make_dir(cm.root_dir, 'foo')
215 215 model = cm.create_file(None, sub_dir)
216 216 name = model['name']
217 217 path = model['path']
218 218
219 219 # Change the name in the model for rename
220 220 model['name'] = 'test_in_sub.ipynb'
221 221 model = cm.update(model, name, path)
222 222 assert isinstance(model, dict)
223 223 self.assertIn('name', model)
224 224 self.assertIn('path', model)
225 225 self.assertEqual(model['name'], 'test_in_sub.ipynb')
226 226 self.assertEqual(model['path'], sub_dir.strip('/'))
227 227
228 228 # Make sure the old name is gone
229 229 self.assertRaises(HTTPError, cm.get_model, name, path)
230 230
231 231 def test_save(self):
232 232 cm = self.contents_manager
233 233 # Create a notebook
234 234 model = cm.create_file()
235 235 name = model['name']
236 236 path = model['path']
237 237
238 238 # Get the model with 'content'
239 239 full_model = cm.get_model(name, path)
240 240
241 241 # Save the notebook
242 242 model = cm.save(full_model, name, path)
243 243 assert isinstance(model, dict)
244 244 self.assertIn('name', model)
245 245 self.assertIn('path', model)
246 246 self.assertEqual(model['name'], name)
247 247 self.assertEqual(model['path'], path)
248 248
249 249 # Test in sub-directory
250 250 # Create a directory and notebook in that directory
251 251 sub_dir = '/foo/'
252 252 self.make_dir(cm.root_dir, 'foo')
253 253 model = cm.create_file(None, sub_dir)
254 254 name = model['name']
255 255 path = model['path']
256 256 model = cm.get_model(name, path)
257 257
258 258 # Change the name in the model for rename
259 259 model = cm.save(model, name, path)
260 260 assert isinstance(model, dict)
261 261 self.assertIn('name', model)
262 262 self.assertIn('path', model)
263 263 self.assertEqual(model['name'], 'Untitled0.ipynb')
264 264 self.assertEqual(model['path'], sub_dir.strip('/'))
265 265
266 266 def test_delete(self):
267 267 cm = self.contents_manager
268 268 # Create a notebook
269 269 nb, name, path = self.new_notebook()
270 270
271 271 # Delete the notebook
272 272 cm.delete(name, path)
273 273
274 274 # Check that a 'get' on the deleted notebook raises and error
275 275 self.assertRaises(HTTPError, cm.get_model, name, path)
276 276
277 277 def test_copy(self):
278 278 cm = self.contents_manager
279 279 path = u'å b'
280 280 name = u'nb √.ipynb'
281 281 os.mkdir(os.path.join(cm.root_dir, path))
282 282 orig = cm.create_file({'name' : name}, path=path)
283 283
284 284 # copy with unspecified name
285 285 copy = cm.copy(name, path=path)
286 286 self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
287 287
288 288 # copy with specified name
289 289 copy2 = cm.copy(name, u'copy 2.ipynb', path=path)
290 290 self.assertEqual(copy2['name'], u'copy 2.ipynb')
291 291
292 292 def test_trust_notebook(self):
293 293 cm = self.contents_manager
294 294 nb, name, path = self.new_notebook()
295 295
296 296 untrusted = cm.get_model(name, path)['content']
297 297 assert not cm.notary.check_cells(untrusted)
298 298
299 299 # print(untrusted)
300 300 cm.trust_notebook(name, path)
301 301 trusted = cm.get_model(name, path)['content']
302 302 # print(trusted)
303 303 assert cm.notary.check_cells(trusted)
304 304
305 305 def test_mark_trusted_cells(self):
306 306 cm = self.contents_manager
307 307 nb, name, path = self.new_notebook()
308 308
309 309 cm.mark_trusted_cells(nb, name, path)
310 310 for cell in nb.cells:
311 311 if cell.cell_type == 'code':
312 312 assert not cell.metadata.trusted
313 313
314 314 cm.trust_notebook(name, path)
315 315 nb = cm.get_model(name, path)['content']
316 316 for cell in nb.cells:
317 317 if cell.cell_type == 'code':
318 318 assert cell.metadata.trusted
319 319
320 320 def test_check_and_sign(self):
321 321 cm = self.contents_manager
322 322 nb, name, path = self.new_notebook()
323 323
324 324 cm.mark_trusted_cells(nb, name, path)
325 325 cm.check_and_sign(nb, name, path)
326 326 assert not cm.notary.check_signature(nb)
327 327
328 328 cm.trust_notebook(name, path)
329 329 nb = cm.get_model(name, path)['content']
330 330 cm.mark_trusted_cells(nb, name, path)
331 331 cm.check_and_sign(nb, name, path)
332 332 assert cm.notary.check_signature(nb)
@@ -1,116 +1,117
1 1 """Test the sessions web service API."""
2 2
3 3 import errno
4 4 import io
5 5 import os
6 6 import json
7 7 import requests
8 8 import shutil
9 9
10 10 pjoin = os.path.join
11 11
12 12 from IPython.html.utils import url_path_join
13 13 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
14 from IPython.nbformat.current import new_notebook, write
14 from IPython.nbformat.v4 import new_notebook
15 from IPython.nbformat import write
15 16
16 17 class SessionAPI(object):
17 18 """Wrapper for notebook API calls."""
18 19 def __init__(self, base_url):
19 20 self.base_url = base_url
20 21
21 22 def _req(self, verb, path, body=None):
22 23 response = requests.request(verb,
23 24 url_path_join(self.base_url, 'api/sessions', path), data=body)
24 25
25 26 if 400 <= response.status_code < 600:
26 27 try:
27 28 response.reason = response.json()['message']
28 29 except:
29 30 pass
30 31 response.raise_for_status()
31 32
32 33 return response
33 34
34 35 def list(self):
35 36 return self._req('GET', '')
36 37
37 38 def get(self, id):
38 39 return self._req('GET', id)
39 40
40 41 def create(self, name, path, kernel_name='python'):
41 42 body = json.dumps({'notebook': {'name':name, 'path':path},
42 43 'kernel': {'name': kernel_name}})
43 44 return self._req('POST', '', body)
44 45
45 46 def modify(self, id, name, path):
46 47 body = json.dumps({'notebook': {'name':name, 'path':path}})
47 48 return self._req('PATCH', id, body)
48 49
49 50 def delete(self, id):
50 51 return self._req('DELETE', id)
51 52
52 53 class SessionAPITest(NotebookTestBase):
53 54 """Test the sessions web service API"""
54 55 def setUp(self):
55 56 nbdir = self.notebook_dir.name
56 57 try:
57 58 os.mkdir(pjoin(nbdir, 'foo'))
58 59 except OSError as e:
59 60 # Deleting the folder in an earlier test may have failed
60 61 if e.errno != errno.EEXIST:
61 62 raise
62 63
63 64 with io.open(pjoin(nbdir, 'foo', 'nb1.ipynb'), 'w',
64 65 encoding='utf-8') as f:
65 66 nb = new_notebook()
66 write(nb, f, format='ipynb')
67 write(f, nb, version=4)
67 68
68 69 self.sess_api = SessionAPI(self.base_url())
69 70
70 71 def tearDown(self):
71 72 for session in self.sess_api.list().json():
72 73 self.sess_api.delete(session['id'])
73 74 shutil.rmtree(pjoin(self.notebook_dir.name, 'foo'),
74 75 ignore_errors=True)
75 76
76 77 def test_create(self):
77 78 sessions = self.sess_api.list().json()
78 79 self.assertEqual(len(sessions), 0)
79 80
80 81 resp = self.sess_api.create('nb1.ipynb', 'foo')
81 82 self.assertEqual(resp.status_code, 201)
82 83 newsession = resp.json()
83 84 self.assertIn('id', newsession)
84 85 self.assertEqual(newsession['notebook']['name'], 'nb1.ipynb')
85 86 self.assertEqual(newsession['notebook']['path'], 'foo')
86 87 self.assertEqual(resp.headers['Location'], '/api/sessions/{0}'.format(newsession['id']))
87 88
88 89 sessions = self.sess_api.list().json()
89 90 self.assertEqual(sessions, [newsession])
90 91
91 92 # Retrieve it
92 93 sid = newsession['id']
93 94 got = self.sess_api.get(sid).json()
94 95 self.assertEqual(got, newsession)
95 96
96 97 def test_delete(self):
97 98 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
98 99 sid = newsession['id']
99 100
100 101 resp = self.sess_api.delete(sid)
101 102 self.assertEqual(resp.status_code, 204)
102 103
103 104 sessions = self.sess_api.list().json()
104 105 self.assertEqual(sessions, [])
105 106
106 107 with assert_http_error(404):
107 108 self.sess_api.get(sid)
108 109
109 110 def test_modify(self):
110 111 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
111 112 sid = newsession['id']
112 113
113 114 changed = self.sess_api.modify(sid, 'nb2.ipynb', '').json()
114 115 self.assertEqual(changed['id'], sid)
115 116 self.assertEqual(changed['notebook']['name'], 'nb2.ipynb')
116 117 self.assertEqual(changed['notebook']['path'], '')
@@ -1,151 +1,152
1 1 # coding: utf-8
2 2 """Test the /files/ handler."""
3 3
4 4 import io
5 5 import os
6 6 from unicodedata import normalize
7 7
8 8 pjoin = os.path.join
9 9
10 10 import requests
11 11 import json
12 12
13 from IPython.nbformat.current import (new_notebook, write,
13 from IPython.nbformat import write
14 from IPython.nbformat.v4 import (new_notebook,
14 15 new_markdown_cell, new_code_cell,
15 16 new_output)
16 17
17 18 from IPython.html.utils import url_path_join
18 19 from .launchnotebook import NotebookTestBase
19 20 from IPython.utils import py3compat
20 21
21 22
22 23 class FilesTest(NotebookTestBase):
23 24 def test_hidden_files(self):
24 25 not_hidden = [
25 26 u'å b',
26 27 u'å b/ç. d',
27 28 ]
28 29 hidden = [
29 30 u'.å b',
30 31 u'å b/.ç d',
31 32 ]
32 33 dirs = not_hidden + hidden
33 34
34 35 nbdir = self.notebook_dir.name
35 36 for d in dirs:
36 37 path = pjoin(nbdir, d.replace('/', os.sep))
37 38 if not os.path.exists(path):
38 39 os.mkdir(path)
39 40 with open(pjoin(path, 'foo'), 'w') as f:
40 41 f.write('foo')
41 42 with open(pjoin(path, '.foo'), 'w') as f:
42 43 f.write('.foo')
43 44 url = self.base_url()
44 45
45 46 for d in not_hidden:
46 47 path = pjoin(nbdir, d.replace('/', os.sep))
47 48 r = requests.get(url_path_join(url, 'files', d, 'foo'))
48 49 r.raise_for_status()
49 50 self.assertEqual(r.text, 'foo')
50 51 r = requests.get(url_path_join(url, 'files', d, '.foo'))
51 52 self.assertEqual(r.status_code, 404)
52 53
53 54 for d in hidden:
54 55 path = pjoin(nbdir, d.replace('/', os.sep))
55 56 for foo in ('foo', '.foo'):
56 57 r = requests.get(url_path_join(url, 'files', d, foo))
57 58 self.assertEqual(r.status_code, 404)
58 59
59 60 def test_contents_manager(self):
60 61 "make sure ContentsManager returns right files (ipynb, bin, txt)."
61 62
62 63 nbdir = self.notebook_dir.name
63 64 base = self.base_url()
64 65
65 66 nb = new_notebook(
66 67 cells=[
67 68 new_markdown_cell(u'Created by test ³'),
68 69 new_code_cell("print(2*6)", outputs=[
69 70 new_output("stream", text="12"),
70 71 ])
71 72 ]
72 73 )
73 74
74 75 with io.open(pjoin(nbdir, 'testnb.ipynb'), 'w',
75 76 encoding='utf-8') as f:
76 write(nb, f)
77 write(f, nb, version=4)
77 78
78 79 with io.open(pjoin(nbdir, 'test.bin'), 'wb') as f:
79 80 f.write(b'\xff' + os.urandom(5))
80 81 f.close()
81 82
82 83 with io.open(pjoin(nbdir, 'test.txt'), 'w') as f:
83 84 f.write(u'foobar')
84 85 f.close()
85 86
86 87 r = requests.get(url_path_join(base, 'files', 'testnb.ipynb'))
87 88 self.assertEqual(r.status_code, 200)
88 89 self.assertIn('print(2*6)', r.text)
89 90 json.loads(r.text)
90 91
91 92 r = requests.get(url_path_join(base, 'files', 'test.bin'))
92 93 self.assertEqual(r.status_code, 200)
93 94 self.assertEqual(r.headers['content-type'], 'application/octet-stream')
94 95 self.assertEqual(r.content[:1], b'\xff')
95 96 self.assertEqual(len(r.content), 6)
96 97
97 98 r = requests.get(url_path_join(base, 'files', 'test.txt'))
98 99 self.assertEqual(r.status_code, 200)
99 100 self.assertEqual(r.headers['content-type'], 'text/plain')
100 101 self.assertEqual(r.text, 'foobar')
101 102
102 103 def test_download(self):
103 104 nbdir = self.notebook_dir.name
104 105 base = self.base_url()
105 106
106 107 text = 'hello'
107 108 with open(pjoin(nbdir, 'test.txt'), 'w') as f:
108 109 f.write(text)
109 110
110 111 r = requests.get(url_path_join(base, 'files', 'test.txt'))
111 112 disposition = r.headers.get('Content-Disposition', '')
112 113 self.assertNotIn('attachment', disposition)
113 114
114 115 r = requests.get(url_path_join(base, 'files', 'test.txt') + '?download=1')
115 116 disposition = r.headers.get('Content-Disposition', '')
116 117 self.assertIn('attachment', disposition)
117 118 self.assertIn('filename="test.txt"', disposition)
118 119
119 120 def test_old_files_redirect(self):
120 121 """pre-2.0 'files/' prefixed links are properly redirected"""
121 122 nbdir = self.notebook_dir.name
122 123 base = self.base_url()
123 124
124 125 os.mkdir(pjoin(nbdir, 'files'))
125 126 os.makedirs(pjoin(nbdir, 'sub', 'files'))
126 127
127 128 for prefix in ('', 'sub'):
128 129 with open(pjoin(nbdir, prefix, 'files', 'f1.txt'), 'w') as f:
129 130 f.write(prefix + '/files/f1')
130 131 with open(pjoin(nbdir, prefix, 'files', 'f2.txt'), 'w') as f:
131 132 f.write(prefix + '/files/f2')
132 133 with open(pjoin(nbdir, prefix, 'f2.txt'), 'w') as f:
133 134 f.write(prefix + '/f2')
134 135 with open(pjoin(nbdir, prefix, 'f3.txt'), 'w') as f:
135 136 f.write(prefix + '/f3')
136 137
137 138 url = url_path_join(base, 'notebooks', prefix, 'files', 'f1.txt')
138 139 r = requests.get(url)
139 140 self.assertEqual(r.status_code, 200)
140 141 self.assertEqual(r.text, prefix + '/files/f1')
141 142
142 143 url = url_path_join(base, 'notebooks', prefix, 'files', 'f2.txt')
143 144 r = requests.get(url)
144 145 self.assertEqual(r.status_code, 200)
145 146 self.assertEqual(r.text, prefix + '/files/f2')
146 147
147 148 url = url_path_join(base, 'notebooks', prefix, 'files', 'f3.txt')
148 149 r = requests.get(url)
149 150 self.assertEqual(r.status_code, 200)
150 151 self.assertEqual(r.text, prefix + '/f3')
151 152
General Comments 0
You need to be logged in to leave comments. Login now