##// END OF EJS Templates
use from_dict for dict->notebook...
MinRK -
Show More
@@ -1,148 +1,148 b''
1 """Tornado handlers for nbconvert."""
1 """Tornado handlers for nbconvert."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import io
6 import io
7 import os
7 import os
8 import zipfile
8 import zipfile
9
9
10 from tornado import web
10 from tornado import web
11
11
12 from ..base.handlers import (
12 from ..base.handlers import (
13 IPythonHandler, FilesRedirectHandler,
13 IPythonHandler, FilesRedirectHandler,
14 notebook_path_regex, path_regex,
14 notebook_path_regex, path_regex,
15 )
15 )
16 from IPython.nbformat.current import to_notebook_json
16 from IPython.nbformat.current import from_dict
17
17
18 from IPython.utils.py3compat import cast_bytes
18 from IPython.utils.py3compat import cast_bytes
19
19
20 def find_resource_files(output_files_dir):
20 def find_resource_files(output_files_dir):
21 files = []
21 files = []
22 for dirpath, dirnames, filenames in os.walk(output_files_dir):
22 for dirpath, dirnames, filenames in os.walk(output_files_dir):
23 files.extend([os.path.join(dirpath, f) for f in filenames])
23 files.extend([os.path.join(dirpath, f) for f in filenames])
24 return files
24 return files
25
25
26 def respond_zip(handler, name, output, resources):
26 def respond_zip(handler, name, output, resources):
27 """Zip up the output and resource files and respond with the zip file.
27 """Zip up the output and resource files and respond with the zip file.
28
28
29 Returns True if it has served a zip file, False if there are no resource
29 Returns True if it has served a zip file, False if there are no resource
30 files, in which case we serve the plain output file.
30 files, in which case we serve the plain output file.
31 """
31 """
32 # Check if we have resource files we need to zip
32 # Check if we have resource files we need to zip
33 output_files = resources.get('outputs', None)
33 output_files = resources.get('outputs', None)
34 if not output_files:
34 if not output_files:
35 return False
35 return False
36
36
37 # Headers
37 # Headers
38 zip_filename = os.path.splitext(name)[0] + '.zip'
38 zip_filename = os.path.splitext(name)[0] + '.zip'
39 handler.set_header('Content-Disposition',
39 handler.set_header('Content-Disposition',
40 'attachment; filename="%s"' % zip_filename)
40 'attachment; filename="%s"' % zip_filename)
41 handler.set_header('Content-Type', 'application/zip')
41 handler.set_header('Content-Type', 'application/zip')
42
42
43 # Prepare the zip file
43 # Prepare the zip file
44 buffer = io.BytesIO()
44 buffer = io.BytesIO()
45 zipf = zipfile.ZipFile(buffer, mode='w', compression=zipfile.ZIP_DEFLATED)
45 zipf = zipfile.ZipFile(buffer, mode='w', compression=zipfile.ZIP_DEFLATED)
46 output_filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
46 output_filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
47 zipf.writestr(output_filename, cast_bytes(output, 'utf-8'))
47 zipf.writestr(output_filename, cast_bytes(output, 'utf-8'))
48 for filename, data in output_files.items():
48 for filename, data in output_files.items():
49 zipf.writestr(os.path.basename(filename), data)
49 zipf.writestr(os.path.basename(filename), data)
50 zipf.close()
50 zipf.close()
51
51
52 handler.finish(buffer.getvalue())
52 handler.finish(buffer.getvalue())
53 return True
53 return True
54
54
55 def get_exporter(format, **kwargs):
55 def get_exporter(format, **kwargs):
56 """get an exporter, raising appropriate errors"""
56 """get an exporter, raising appropriate errors"""
57 # if this fails, will raise 500
57 # if this fails, will raise 500
58 try:
58 try:
59 from IPython.nbconvert.exporters.export import exporter_map
59 from IPython.nbconvert.exporters.export import exporter_map
60 except ImportError as e:
60 except ImportError as e:
61 raise web.HTTPError(500, "Could not import nbconvert: %s" % e)
61 raise web.HTTPError(500, "Could not import nbconvert: %s" % e)
62
62
63 try:
63 try:
64 Exporter = exporter_map[format]
64 Exporter = exporter_map[format]
65 except KeyError:
65 except KeyError:
66 # should this be 400?
66 # should this be 400?
67 raise web.HTTPError(404, u"No exporter for format: %s" % format)
67 raise web.HTTPError(404, u"No exporter for format: %s" % format)
68
68
69 try:
69 try:
70 return Exporter(**kwargs)
70 return Exporter(**kwargs)
71 except Exception as e:
71 except Exception as e:
72 raise web.HTTPError(500, "Could not construct Exporter: %s" % e)
72 raise web.HTTPError(500, "Could not construct Exporter: %s" % e)
73
73
74 class NbconvertFileHandler(IPythonHandler):
74 class NbconvertFileHandler(IPythonHandler):
75
75
76 SUPPORTED_METHODS = ('GET',)
76 SUPPORTED_METHODS = ('GET',)
77
77
78 @web.authenticated
78 @web.authenticated
79 def get(self, format, path='', name=None):
79 def get(self, format, path='', name=None):
80
80
81 exporter = get_exporter(format, config=self.config, log=self.log)
81 exporter = get_exporter(format, config=self.config, log=self.log)
82
82
83 path = path.strip('/')
83 path = path.strip('/')
84 model = self.contents_manager.get_model(name=name, path=path)
84 model = self.contents_manager.get_model(name=name, path=path)
85
85
86 self.set_header('Last-Modified', model['last_modified'])
86 self.set_header('Last-Modified', model['last_modified'])
87
87
88 try:
88 try:
89 output, resources = exporter.from_notebook_node(model['content'])
89 output, resources = exporter.from_notebook_node(model['content'])
90 except Exception as e:
90 except Exception as e:
91 raise web.HTTPError(500, "nbconvert failed: %s" % e)
91 raise web.HTTPError(500, "nbconvert failed: %s" % e)
92
92
93 if respond_zip(self, name, output, resources):
93 if respond_zip(self, name, output, resources):
94 return
94 return
95
95
96 # Force download if requested
96 # Force download if requested
97 if self.get_argument('download', 'false').lower() == 'true':
97 if self.get_argument('download', 'false').lower() == 'true':
98 filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
98 filename = os.path.splitext(name)[0] + '.' + resources['output_extension']
99 self.set_header('Content-Disposition',
99 self.set_header('Content-Disposition',
100 'attachment; filename="%s"' % filename)
100 'attachment; filename="%s"' % filename)
101
101
102 # MIME type
102 # MIME type
103 if exporter.output_mimetype:
103 if exporter.output_mimetype:
104 self.set_header('Content-Type',
104 self.set_header('Content-Type',
105 '%s; charset=utf-8' % exporter.output_mimetype)
105 '%s; charset=utf-8' % exporter.output_mimetype)
106
106
107 self.finish(output)
107 self.finish(output)
108
108
109 class NbconvertPostHandler(IPythonHandler):
109 class NbconvertPostHandler(IPythonHandler):
110 SUPPORTED_METHODS = ('POST',)
110 SUPPORTED_METHODS = ('POST',)
111
111
112 @web.authenticated
112 @web.authenticated
113 def post(self, format):
113 def post(self, format):
114 exporter = get_exporter(format, config=self.config)
114 exporter = get_exporter(format, config=self.config)
115
115
116 model = self.get_json_body()
116 model = self.get_json_body()
117 name = model.get('name', 'notebook.ipynb')
117 name = model.get('name', 'notebook.ipynb')
118 nbnode = to_notebook_json(model['content'])
118 nbnode = from_dict(model['content'])
119
119
120 try:
120 try:
121 output, resources = exporter.from_notebook_node(nbnode)
121 output, resources = exporter.from_notebook_node(nbnode)
122 except Exception as e:
122 except Exception as e:
123 raise web.HTTPError(500, "nbconvert failed: %s" % e)
123 raise web.HTTPError(500, "nbconvert failed: %s" % e)
124
124
125 if respond_zip(self, name, output, resources):
125 if respond_zip(self, name, output, resources):
126 return
126 return
127
127
128 # MIME type
128 # MIME type
129 if exporter.output_mimetype:
129 if exporter.output_mimetype:
130 self.set_header('Content-Type',
130 self.set_header('Content-Type',
131 '%s; charset=utf-8' % exporter.output_mimetype)
131 '%s; charset=utf-8' % exporter.output_mimetype)
132
132
133 self.finish(output)
133 self.finish(output)
134
134
135
135
136 #-----------------------------------------------------------------------------
136 #-----------------------------------------------------------------------------
137 # URL to handler mappings
137 # URL to handler mappings
138 #-----------------------------------------------------------------------------
138 #-----------------------------------------------------------------------------
139
139
140 _format_regex = r"(?P<format>\w+)"
140 _format_regex = r"(?P<format>\w+)"
141
141
142
142
143 default_handlers = [
143 default_handlers = [
144 (r"/nbconvert/%s%s" % (_format_regex, notebook_path_regex),
144 (r"/nbconvert/%s%s" % (_format_regex, notebook_path_regex),
145 NbconvertFileHandler),
145 NbconvertFileHandler),
146 (r"/nbconvert/%s" % _format_regex, NbconvertPostHandler),
146 (r"/nbconvert/%s" % _format_regex, NbconvertPostHandler),
147 (r"/nbconvert/html%s" % path_regex, FilesRedirectHandler),
147 (r"/nbconvert/html%s" % path_regex, FilesRedirectHandler),
148 ]
148 ]
@@ -1,545 +1,545 b''
1 """A contents manager that uses the local file system for storage."""
1 """A contents manager that uses the local file system for storage."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6 import base64
7 import io
7 import io
8 import os
8 import os
9 import glob
9 import glob
10 import shutil
10 import shutil
11
11
12 from tornado import web
12 from tornado import web
13
13
14 from .manager import ContentsManager
14 from .manager import ContentsManager
15 from IPython.nbformat import current
15 from IPython.nbformat import current
16 from IPython.utils.io import atomic_writing
16 from IPython.utils.io import atomic_writing
17 from IPython.utils.path import ensure_dir_exists
17 from IPython.utils.path import ensure_dir_exists
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
18 from IPython.utils.traitlets import Unicode, Bool, TraitError
19 from IPython.utils.py3compat import getcwd
19 from IPython.utils.py3compat import getcwd
20 from IPython.utils import tz
20 from IPython.utils import tz
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
21 from IPython.html.utils import is_hidden, to_os_path, url_path_join
22
22
23
23
24 class FileContentsManager(ContentsManager):
24 class FileContentsManager(ContentsManager):
25
25
26 root_dir = Unicode(getcwd(), config=True)
26 root_dir = Unicode(getcwd(), config=True)
27
27
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
28 save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
29 def _save_script_changed(self):
29 def _save_script_changed(self):
30 self.log.warn("""
30 self.log.warn("""
31 Automatically saving notebooks as scripts has been removed.
31 Automatically saving notebooks as scripts has been removed.
32 Use `ipython nbconvert --to python [notebook]` instead.
32 Use `ipython nbconvert --to python [notebook]` instead.
33 """)
33 """)
34
34
35 def _root_dir_changed(self, name, old, new):
35 def _root_dir_changed(self, name, old, new):
36 """Do a bit of validation of the root_dir."""
36 """Do a bit of validation of the root_dir."""
37 if not os.path.isabs(new):
37 if not os.path.isabs(new):
38 # If we receive a non-absolute path, make it absolute.
38 # If we receive a non-absolute path, make it absolute.
39 self.root_dir = os.path.abspath(new)
39 self.root_dir = os.path.abspath(new)
40 return
40 return
41 if not os.path.isdir(new):
41 if not os.path.isdir(new):
42 raise TraitError("%r is not a directory" % new)
42 raise TraitError("%r is not a directory" % new)
43
43
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
44 checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
45 help="""The directory name in which to keep file checkpoints
45 help="""The directory name in which to keep file checkpoints
46
46
47 This is a path relative to the file's own directory.
47 This is a path relative to the file's own directory.
48
48
49 By default, it is .ipynb_checkpoints
49 By default, it is .ipynb_checkpoints
50 """
50 """
51 )
51 )
52
52
53 def _copy(self, src, dest):
53 def _copy(self, src, dest):
54 """copy src to dest
54 """copy src to dest
55
55
56 like shutil.copy2, but log errors in copystat
56 like shutil.copy2, but log errors in copystat
57 """
57 """
58 shutil.copyfile(src, dest)
58 shutil.copyfile(src, dest)
59 try:
59 try:
60 shutil.copystat(src, dest)
60 shutil.copystat(src, dest)
61 except OSError as e:
61 except OSError as e:
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
62 self.log.debug("copystat on %s failed", dest, exc_info=True)
63
63
64 def _get_os_path(self, name=None, path=''):
64 def _get_os_path(self, name=None, path=''):
65 """Given a filename and API path, return its file system
65 """Given a filename and API path, return its file system
66 path.
66 path.
67
67
68 Parameters
68 Parameters
69 ----------
69 ----------
70 name : string
70 name : string
71 A filename
71 A filename
72 path : string
72 path : string
73 The relative API path to the named file.
73 The relative API path to the named file.
74
74
75 Returns
75 Returns
76 -------
76 -------
77 path : string
77 path : string
78 API path to be evaluated relative to root_dir.
78 API path to be evaluated relative to root_dir.
79 """
79 """
80 if name is not None:
80 if name is not None:
81 path = url_path_join(path, name)
81 path = url_path_join(path, name)
82 return to_os_path(path, self.root_dir)
82 return to_os_path(path, self.root_dir)
83
83
84 def path_exists(self, path):
84 def path_exists(self, path):
85 """Does the API-style path refer to an extant directory?
85 """Does the API-style path refer to an extant directory?
86
86
87 API-style wrapper for os.path.isdir
87 API-style wrapper for os.path.isdir
88
88
89 Parameters
89 Parameters
90 ----------
90 ----------
91 path : string
91 path : string
92 The path to check. This is an API path (`/` separated,
92 The path to check. This is an API path (`/` separated,
93 relative to root_dir).
93 relative to root_dir).
94
94
95 Returns
95 Returns
96 -------
96 -------
97 exists : bool
97 exists : bool
98 Whether the path is indeed a directory.
98 Whether the path is indeed a directory.
99 """
99 """
100 path = path.strip('/')
100 path = path.strip('/')
101 os_path = self._get_os_path(path=path)
101 os_path = self._get_os_path(path=path)
102 return os.path.isdir(os_path)
102 return os.path.isdir(os_path)
103
103
104 def is_hidden(self, path):
104 def is_hidden(self, path):
105 """Does the API style path correspond to a hidden directory or file?
105 """Does the API style path correspond to a hidden directory or file?
106
106
107 Parameters
107 Parameters
108 ----------
108 ----------
109 path : string
109 path : string
110 The path to check. This is an API path (`/` separated,
110 The path to check. This is an API path (`/` separated,
111 relative to root_dir).
111 relative to root_dir).
112
112
113 Returns
113 Returns
114 -------
114 -------
115 exists : bool
115 exists : bool
116 Whether the path is hidden.
116 Whether the path is hidden.
117
117
118 """
118 """
119 path = path.strip('/')
119 path = path.strip('/')
120 os_path = self._get_os_path(path=path)
120 os_path = self._get_os_path(path=path)
121 return is_hidden(os_path, self.root_dir)
121 return is_hidden(os_path, self.root_dir)
122
122
123 def file_exists(self, name, path=''):
123 def file_exists(self, name, path=''):
124 """Returns True if the file exists, else returns False.
124 """Returns True if the file exists, else returns False.
125
125
126 API-style wrapper for os.path.isfile
126 API-style wrapper for os.path.isfile
127
127
128 Parameters
128 Parameters
129 ----------
129 ----------
130 name : string
130 name : string
131 The name of the file you are checking.
131 The name of the file you are checking.
132 path : string
132 path : string
133 The relative path to the file's directory (with '/' as separator)
133 The relative path to the file's directory (with '/' as separator)
134
134
135 Returns
135 Returns
136 -------
136 -------
137 exists : bool
137 exists : bool
138 Whether the file exists.
138 Whether the file exists.
139 """
139 """
140 path = path.strip('/')
140 path = path.strip('/')
141 nbpath = self._get_os_path(name, path=path)
141 nbpath = self._get_os_path(name, path=path)
142 return os.path.isfile(nbpath)
142 return os.path.isfile(nbpath)
143
143
144 def exists(self, name=None, path=''):
144 def exists(self, name=None, path=''):
145 """Returns True if the path [and name] exists, else returns False.
145 """Returns True if the path [and name] exists, else returns False.
146
146
147 API-style wrapper for os.path.exists
147 API-style wrapper for os.path.exists
148
148
149 Parameters
149 Parameters
150 ----------
150 ----------
151 name : string
151 name : string
152 The name of the file you are checking.
152 The name of the file you are checking.
153 path : string
153 path : string
154 The relative path to the file's directory (with '/' as separator)
154 The relative path to the file's directory (with '/' as separator)
155
155
156 Returns
156 Returns
157 -------
157 -------
158 exists : bool
158 exists : bool
159 Whether the target exists.
159 Whether the target exists.
160 """
160 """
161 path = path.strip('/')
161 path = path.strip('/')
162 os_path = self._get_os_path(name, path=path)
162 os_path = self._get_os_path(name, path=path)
163 return os.path.exists(os_path)
163 return os.path.exists(os_path)
164
164
165 def _base_model(self, name, path=''):
165 def _base_model(self, name, path=''):
166 """Build the common base of a contents model"""
166 """Build the common base of a contents model"""
167 os_path = self._get_os_path(name, path)
167 os_path = self._get_os_path(name, path)
168 info = os.stat(os_path)
168 info = os.stat(os_path)
169 last_modified = tz.utcfromtimestamp(info.st_mtime)
169 last_modified = tz.utcfromtimestamp(info.st_mtime)
170 created = tz.utcfromtimestamp(info.st_ctime)
170 created = tz.utcfromtimestamp(info.st_ctime)
171 # Create the base model.
171 # Create the base model.
172 model = {}
172 model = {}
173 model['name'] = name
173 model['name'] = name
174 model['path'] = path
174 model['path'] = path
175 model['last_modified'] = last_modified
175 model['last_modified'] = last_modified
176 model['created'] = created
176 model['created'] = created
177 model['content'] = None
177 model['content'] = None
178 model['format'] = None
178 model['format'] = None
179 model['message'] = None
179 model['message'] = None
180 return model
180 return model
181
181
182 def _dir_model(self, name, path='', content=True):
182 def _dir_model(self, name, path='', content=True):
183 """Build a model for a directory
183 """Build a model for a directory
184
184
185 if content is requested, will include a listing of the directory
185 if content is requested, will include a listing of the directory
186 """
186 """
187 os_path = self._get_os_path(name, path)
187 os_path = self._get_os_path(name, path)
188
188
189 four_o_four = u'directory does not exist: %r' % os_path
189 four_o_four = u'directory does not exist: %r' % os_path
190
190
191 if not os.path.isdir(os_path):
191 if not os.path.isdir(os_path):
192 raise web.HTTPError(404, four_o_four)
192 raise web.HTTPError(404, four_o_four)
193 elif is_hidden(os_path, self.root_dir):
193 elif is_hidden(os_path, self.root_dir):
194 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
194 self.log.info("Refusing to serve hidden directory %r, via 404 Error",
195 os_path
195 os_path
196 )
196 )
197 raise web.HTTPError(404, four_o_four)
197 raise web.HTTPError(404, four_o_four)
198
198
199 if name is None:
199 if name is None:
200 if '/' in path:
200 if '/' in path:
201 path, name = path.rsplit('/', 1)
201 path, name = path.rsplit('/', 1)
202 else:
202 else:
203 name = ''
203 name = ''
204 model = self._base_model(name, path)
204 model = self._base_model(name, path)
205 model['type'] = 'directory'
205 model['type'] = 'directory'
206 dir_path = u'{}/{}'.format(path, name)
206 dir_path = u'{}/{}'.format(path, name)
207 if content:
207 if content:
208 model['content'] = contents = []
208 model['content'] = contents = []
209 for os_path in glob.glob(self._get_os_path('*', dir_path)):
209 for os_path in glob.glob(self._get_os_path('*', dir_path)):
210 name = os.path.basename(os_path)
210 name = os.path.basename(os_path)
211 # skip over broken symlinks in listing
211 # skip over broken symlinks in listing
212 if not os.path.exists(os_path):
212 if not os.path.exists(os_path):
213 self.log.warn("%s doesn't exist", os_path)
213 self.log.warn("%s doesn't exist", os_path)
214 continue
214 continue
215 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
215 if self.should_list(name) and not is_hidden(os_path, self.root_dir):
216 contents.append(self.get_model(name=name, path=dir_path, content=False))
216 contents.append(self.get_model(name=name, path=dir_path, content=False))
217
217
218 model['format'] = 'json'
218 model['format'] = 'json'
219
219
220 return model
220 return model
221
221
222 def _file_model(self, name, path='', content=True):
222 def _file_model(self, name, path='', content=True):
223 """Build a model for a file
223 """Build a model for a file
224
224
225 if content is requested, include the file contents.
225 if content is requested, include the file contents.
226 UTF-8 text files will be unicode, binary files will be base64-encoded.
226 UTF-8 text files will be unicode, binary files will be base64-encoded.
227 """
227 """
228 model = self._base_model(name, path)
228 model = self._base_model(name, path)
229 model['type'] = 'file'
229 model['type'] = 'file'
230 if content:
230 if content:
231 os_path = self._get_os_path(name, path)
231 os_path = self._get_os_path(name, path)
232 with io.open(os_path, 'rb') as f:
232 with io.open(os_path, 'rb') as f:
233 bcontent = f.read()
233 bcontent = f.read()
234 try:
234 try:
235 model['content'] = bcontent.decode('utf8')
235 model['content'] = bcontent.decode('utf8')
236 except UnicodeError as e:
236 except UnicodeError as e:
237 model['content'] = base64.encodestring(bcontent).decode('ascii')
237 model['content'] = base64.encodestring(bcontent).decode('ascii')
238 model['format'] = 'base64'
238 model['format'] = 'base64'
239 else:
239 else:
240 model['format'] = 'text'
240 model['format'] = 'text'
241 return model
241 return model
242
242
243
243
244 def _notebook_model(self, name, path='', content=True):
244 def _notebook_model(self, name, path='', content=True):
245 """Build a notebook model
245 """Build a notebook model
246
246
247 if content is requested, the notebook content will be populated
247 if content is requested, the notebook content will be populated
248 as a JSON structure (not double-serialized)
248 as a JSON structure (not double-serialized)
249 """
249 """
250 model = self._base_model(name, path)
250 model = self._base_model(name, path)
251 model['type'] = 'notebook'
251 model['type'] = 'notebook'
252 if content:
252 if content:
253 os_path = self._get_os_path(name, path)
253 os_path = self._get_os_path(name, path)
254 with io.open(os_path, 'r', encoding='utf-8') as f:
254 with io.open(os_path, 'r', encoding='utf-8') as f:
255 try:
255 try:
256 nb = current.read(f, u'json')
256 nb = current.read(f, u'json')
257 except Exception as e:
257 except Exception as e:
258 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
258 raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
259 self.mark_trusted_cells(nb, name, path)
259 self.mark_trusted_cells(nb, name, path)
260 model['content'] = nb
260 model['content'] = nb
261 model['format'] = 'json'
261 model['format'] = 'json'
262 self.validate_notebook_model(model)
262 self.validate_notebook_model(model)
263 return model
263 return model
264
264
265 def get_model(self, name, path='', content=True):
265 def get_model(self, name, path='', content=True):
266 """ Takes a path and name for an entity and returns its model
266 """ Takes a path and name for an entity and returns its model
267
267
268 Parameters
268 Parameters
269 ----------
269 ----------
270 name : str
270 name : str
271 the name of the target
271 the name of the target
272 path : str
272 path : str
273 the API path that describes the relative path for the target
273 the API path that describes the relative path for the target
274
274
275 Returns
275 Returns
276 -------
276 -------
277 model : dict
277 model : dict
278 the contents model. If content=True, returns the contents
278 the contents model. If content=True, returns the contents
279 of the file or directory as well.
279 of the file or directory as well.
280 """
280 """
281 path = path.strip('/')
281 path = path.strip('/')
282
282
283 if not self.exists(name=name, path=path):
283 if not self.exists(name=name, path=path):
284 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
284 raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
285
285
286 os_path = self._get_os_path(name, path)
286 os_path = self._get_os_path(name, path)
287 if os.path.isdir(os_path):
287 if os.path.isdir(os_path):
288 model = self._dir_model(name, path, content)
288 model = self._dir_model(name, path, content)
289 elif name.endswith('.ipynb'):
289 elif name.endswith('.ipynb'):
290 model = self._notebook_model(name, path, content)
290 model = self._notebook_model(name, path, content)
291 else:
291 else:
292 model = self._file_model(name, path, content)
292 model = self._file_model(name, path, content)
293 return model
293 return model
294
294
295 def _save_notebook(self, os_path, model, name='', path=''):
295 def _save_notebook(self, os_path, model, name='', path=''):
296 """save a notebook file"""
296 """save a notebook file"""
297 # Save the notebook file
297 # Save the notebook file
298 nb = current.to_notebook_json(model['content'])
298 nb = current.from_dict(model['content'])
299
299
300 self.check_and_sign(nb, name, path)
300 self.check_and_sign(nb, name, path)
301
301
302 if 'name' in nb['metadata']:
302 if 'name' in nb['metadata']:
303 nb['metadata']['name'] = u''
303 nb['metadata']['name'] = u''
304
304
305 with atomic_writing(os_path, encoding='utf-8') as f:
305 with atomic_writing(os_path, encoding='utf-8') as f:
306 current.write(nb, f, version=nb.nbformat)
306 current.write(nb, f, version=nb.nbformat)
307
307
308 def _save_file(self, os_path, model, name='', path=''):
308 def _save_file(self, os_path, model, name='', path=''):
309 """save a non-notebook file"""
309 """save a non-notebook file"""
310 fmt = model.get('format', None)
310 fmt = model.get('format', None)
311 if fmt not in {'text', 'base64'}:
311 if fmt not in {'text', 'base64'}:
312 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
312 raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
313 try:
313 try:
314 content = model['content']
314 content = model['content']
315 if fmt == 'text':
315 if fmt == 'text':
316 bcontent = content.encode('utf8')
316 bcontent = content.encode('utf8')
317 else:
317 else:
318 b64_bytes = content.encode('ascii')
318 b64_bytes = content.encode('ascii')
319 bcontent = base64.decodestring(b64_bytes)
319 bcontent = base64.decodestring(b64_bytes)
320 except Exception as e:
320 except Exception as e:
321 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
321 raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
322 with atomic_writing(os_path, text=False) as f:
322 with atomic_writing(os_path, text=False) as f:
323 f.write(bcontent)
323 f.write(bcontent)
324
324
325 def _save_directory(self, os_path, model, name='', path=''):
325 def _save_directory(self, os_path, model, name='', path=''):
326 """create a directory"""
326 """create a directory"""
327 if is_hidden(os_path, self.root_dir):
327 if is_hidden(os_path, self.root_dir):
328 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
328 raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
329 if not os.path.exists(os_path):
329 if not os.path.exists(os_path):
330 os.mkdir(os_path)
330 os.mkdir(os_path)
331 elif not os.path.isdir(os_path):
331 elif not os.path.isdir(os_path):
332 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
332 raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
333 else:
333 else:
334 self.log.debug("Directory %r already exists", os_path)
334 self.log.debug("Directory %r already exists", os_path)
335
335
336 def save(self, model, name='', path=''):
336 def save(self, model, name='', path=''):
337 """Save the file model and return the model with no content."""
337 """Save the file model and return the model with no content."""
338 path = path.strip('/')
338 path = path.strip('/')
339
339
340 if 'type' not in model:
340 if 'type' not in model:
341 raise web.HTTPError(400, u'No file type provided')
341 raise web.HTTPError(400, u'No file type provided')
342 if 'content' not in model and model['type'] != 'directory':
342 if 'content' not in model and model['type'] != 'directory':
343 raise web.HTTPError(400, u'No file content provided')
343 raise web.HTTPError(400, u'No file content provided')
344
344
345 # One checkpoint should always exist
345 # One checkpoint should always exist
346 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
346 if self.file_exists(name, path) and not self.list_checkpoints(name, path):
347 self.create_checkpoint(name, path)
347 self.create_checkpoint(name, path)
348
348
349 new_path = model.get('path', path).strip('/')
349 new_path = model.get('path', path).strip('/')
350 new_name = model.get('name', name)
350 new_name = model.get('name', name)
351
351
352 if path != new_path or name != new_name:
352 if path != new_path or name != new_name:
353 self.rename(name, path, new_name, new_path)
353 self.rename(name, path, new_name, new_path)
354
354
355 os_path = self._get_os_path(new_name, new_path)
355 os_path = self._get_os_path(new_name, new_path)
356 self.log.debug("Saving %s", os_path)
356 self.log.debug("Saving %s", os_path)
357 try:
357 try:
358 if model['type'] == 'notebook':
358 if model['type'] == 'notebook':
359 self._save_notebook(os_path, model, new_name, new_path)
359 self._save_notebook(os_path, model, new_name, new_path)
360 elif model['type'] == 'file':
360 elif model['type'] == 'file':
361 self._save_file(os_path, model, new_name, new_path)
361 self._save_file(os_path, model, new_name, new_path)
362 elif model['type'] == 'directory':
362 elif model['type'] == 'directory':
363 self._save_directory(os_path, model, new_name, new_path)
363 self._save_directory(os_path, model, new_name, new_path)
364 else:
364 else:
365 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
365 raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
366 except web.HTTPError:
366 except web.HTTPError:
367 raise
367 raise
368 except Exception as e:
368 except Exception as e:
369 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
369 raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
370
370
371 validation_message = None
371 validation_message = None
372 if model['type'] == 'notebook':
372 if model['type'] == 'notebook':
373 self.validate_notebook_model(model)
373 self.validate_notebook_model(model)
374 validation_message = model.get('message', None)
374 validation_message = model.get('message', None)
375
375
376 model = self.get_model(new_name, new_path, content=False)
376 model = self.get_model(new_name, new_path, content=False)
377 if validation_message:
377 if validation_message:
378 model['message'] = validation_message
378 model['message'] = validation_message
379 return model
379 return model
380
380
381 def update(self, model, name, path=''):
381 def update(self, model, name, path=''):
382 """Update the file's path and/or name
382 """Update the file's path and/or name
383
383
384 For use in PATCH requests, to enable renaming a file without
384 For use in PATCH requests, to enable renaming a file without
385 re-uploading its contents. Only used for renaming at the moment.
385 re-uploading its contents. Only used for renaming at the moment.
386 """
386 """
387 path = path.strip('/')
387 path = path.strip('/')
388 new_name = model.get('name', name)
388 new_name = model.get('name', name)
389 new_path = model.get('path', path).strip('/')
389 new_path = model.get('path', path).strip('/')
390 if path != new_path or name != new_name:
390 if path != new_path or name != new_name:
391 self.rename(name, path, new_name, new_path)
391 self.rename(name, path, new_name, new_path)
392 model = self.get_model(new_name, new_path, content=False)
392 model = self.get_model(new_name, new_path, content=False)
393 return model
393 return model
394
394
395 def delete(self, name, path=''):
395 def delete(self, name, path=''):
396 """Delete file by name and path."""
396 """Delete file by name and path."""
397 path = path.strip('/')
397 path = path.strip('/')
398 os_path = self._get_os_path(name, path)
398 os_path = self._get_os_path(name, path)
399 rm = os.unlink
399 rm = os.unlink
400 if os.path.isdir(os_path):
400 if os.path.isdir(os_path):
401 listing = os.listdir(os_path)
401 listing = os.listdir(os_path)
402 # don't delete non-empty directories (checkpoints dir doesn't count)
402 # don't delete non-empty directories (checkpoints dir doesn't count)
403 if listing and listing != [self.checkpoint_dir]:
403 if listing and listing != [self.checkpoint_dir]:
404 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
404 raise web.HTTPError(400, u'Directory %s not empty' % os_path)
405 elif not os.path.isfile(os_path):
405 elif not os.path.isfile(os_path):
406 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
406 raise web.HTTPError(404, u'File does not exist: %s' % os_path)
407
407
408 # clear checkpoints
408 # clear checkpoints
409 for checkpoint in self.list_checkpoints(name, path):
409 for checkpoint in self.list_checkpoints(name, path):
410 checkpoint_id = checkpoint['id']
410 checkpoint_id = checkpoint['id']
411 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
411 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
412 if os.path.isfile(cp_path):
412 if os.path.isfile(cp_path):
413 self.log.debug("Unlinking checkpoint %s", cp_path)
413 self.log.debug("Unlinking checkpoint %s", cp_path)
414 os.unlink(cp_path)
414 os.unlink(cp_path)
415
415
416 if os.path.isdir(os_path):
416 if os.path.isdir(os_path):
417 self.log.debug("Removing directory %s", os_path)
417 self.log.debug("Removing directory %s", os_path)
418 shutil.rmtree(os_path)
418 shutil.rmtree(os_path)
419 else:
419 else:
420 self.log.debug("Unlinking file %s", os_path)
420 self.log.debug("Unlinking file %s", os_path)
421 rm(os_path)
421 rm(os_path)
422
422
423 def rename(self, old_name, old_path, new_name, new_path):
423 def rename(self, old_name, old_path, new_name, new_path):
424 """Rename a file."""
424 """Rename a file."""
425 old_path = old_path.strip('/')
425 old_path = old_path.strip('/')
426 new_path = new_path.strip('/')
426 new_path = new_path.strip('/')
427 if new_name == old_name and new_path == old_path:
427 if new_name == old_name and new_path == old_path:
428 return
428 return
429
429
430 new_os_path = self._get_os_path(new_name, new_path)
430 new_os_path = self._get_os_path(new_name, new_path)
431 old_os_path = self._get_os_path(old_name, old_path)
431 old_os_path = self._get_os_path(old_name, old_path)
432
432
433 # Should we proceed with the move?
433 # Should we proceed with the move?
434 if os.path.isfile(new_os_path):
434 if os.path.isfile(new_os_path):
435 raise web.HTTPError(409, u'File with name already exists: %s' % new_os_path)
435 raise web.HTTPError(409, u'File with name already exists: %s' % new_os_path)
436
436
437 # Move the file
437 # Move the file
438 try:
438 try:
439 shutil.move(old_os_path, new_os_path)
439 shutil.move(old_os_path, new_os_path)
440 except Exception as e:
440 except Exception as e:
441 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
441 raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
442
442
443 # Move the checkpoints
443 # Move the checkpoints
444 old_checkpoints = self.list_checkpoints(old_name, old_path)
444 old_checkpoints = self.list_checkpoints(old_name, old_path)
445 for cp in old_checkpoints:
445 for cp in old_checkpoints:
446 checkpoint_id = cp['id']
446 checkpoint_id = cp['id']
447 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
447 old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
448 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
448 new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
449 if os.path.isfile(old_cp_path):
449 if os.path.isfile(old_cp_path):
450 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
450 self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
451 shutil.move(old_cp_path, new_cp_path)
451 shutil.move(old_cp_path, new_cp_path)
452
452
453 # Checkpoint-related utilities
453 # Checkpoint-related utilities
454
454
455 def get_checkpoint_path(self, checkpoint_id, name, path=''):
455 def get_checkpoint_path(self, checkpoint_id, name, path=''):
456 """find the path to a checkpoint"""
456 """find the path to a checkpoint"""
457 path = path.strip('/')
457 path = path.strip('/')
458 basename, ext = os.path.splitext(name)
458 basename, ext = os.path.splitext(name)
459 filename = u"{name}-{checkpoint_id}{ext}".format(
459 filename = u"{name}-{checkpoint_id}{ext}".format(
460 name=basename,
460 name=basename,
461 checkpoint_id=checkpoint_id,
461 checkpoint_id=checkpoint_id,
462 ext=ext,
462 ext=ext,
463 )
463 )
464 os_path = self._get_os_path(path=path)
464 os_path = self._get_os_path(path=path)
465 cp_dir = os.path.join(os_path, self.checkpoint_dir)
465 cp_dir = os.path.join(os_path, self.checkpoint_dir)
466 ensure_dir_exists(cp_dir)
466 ensure_dir_exists(cp_dir)
467 cp_path = os.path.join(cp_dir, filename)
467 cp_path = os.path.join(cp_dir, filename)
468 return cp_path
468 return cp_path
469
469
470 def get_checkpoint_model(self, checkpoint_id, name, path=''):
470 def get_checkpoint_model(self, checkpoint_id, name, path=''):
471 """construct the info dict for a given checkpoint"""
471 """construct the info dict for a given checkpoint"""
472 path = path.strip('/')
472 path = path.strip('/')
473 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
473 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
474 stats = os.stat(cp_path)
474 stats = os.stat(cp_path)
475 last_modified = tz.utcfromtimestamp(stats.st_mtime)
475 last_modified = tz.utcfromtimestamp(stats.st_mtime)
476 info = dict(
476 info = dict(
477 id = checkpoint_id,
477 id = checkpoint_id,
478 last_modified = last_modified,
478 last_modified = last_modified,
479 )
479 )
480 return info
480 return info
481
481
482 # public checkpoint API
482 # public checkpoint API
483
483
484 def create_checkpoint(self, name, path=''):
484 def create_checkpoint(self, name, path=''):
485 """Create a checkpoint from the current state of a file"""
485 """Create a checkpoint from the current state of a file"""
486 path = path.strip('/')
486 path = path.strip('/')
487 src_path = self._get_os_path(name, path)
487 src_path = self._get_os_path(name, path)
488 # only the one checkpoint ID:
488 # only the one checkpoint ID:
489 checkpoint_id = u"checkpoint"
489 checkpoint_id = u"checkpoint"
490 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
490 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
491 self.log.debug("creating checkpoint for %s", name)
491 self.log.debug("creating checkpoint for %s", name)
492 self._copy(src_path, cp_path)
492 self._copy(src_path, cp_path)
493
493
494 # return the checkpoint info
494 # return the checkpoint info
495 return self.get_checkpoint_model(checkpoint_id, name, path)
495 return self.get_checkpoint_model(checkpoint_id, name, path)
496
496
497 def list_checkpoints(self, name, path=''):
497 def list_checkpoints(self, name, path=''):
498 """list the checkpoints for a given file
498 """list the checkpoints for a given file
499
499
500 This contents manager currently only supports one checkpoint per file.
500 This contents manager currently only supports one checkpoint per file.
501 """
501 """
502 path = path.strip('/')
502 path = path.strip('/')
503 checkpoint_id = "checkpoint"
503 checkpoint_id = "checkpoint"
504 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
504 os_path = self.get_checkpoint_path(checkpoint_id, name, path)
505 if not os.path.exists(os_path):
505 if not os.path.exists(os_path):
506 return []
506 return []
507 else:
507 else:
508 return [self.get_checkpoint_model(checkpoint_id, name, path)]
508 return [self.get_checkpoint_model(checkpoint_id, name, path)]
509
509
510
510
511 def restore_checkpoint(self, checkpoint_id, name, path=''):
511 def restore_checkpoint(self, checkpoint_id, name, path=''):
512 """restore a file to a checkpointed state"""
512 """restore a file to a checkpointed state"""
513 path = path.strip('/')
513 path = path.strip('/')
514 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
514 self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
515 nb_path = self._get_os_path(name, path)
515 nb_path = self._get_os_path(name, path)
516 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
516 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
517 if not os.path.isfile(cp_path):
517 if not os.path.isfile(cp_path):
518 self.log.debug("checkpoint file does not exist: %s", cp_path)
518 self.log.debug("checkpoint file does not exist: %s", cp_path)
519 raise web.HTTPError(404,
519 raise web.HTTPError(404,
520 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
520 u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
521 )
521 )
522 # ensure notebook is readable (never restore from an unreadable notebook)
522 # ensure notebook is readable (never restore from an unreadable notebook)
523 if cp_path.endswith('.ipynb'):
523 if cp_path.endswith('.ipynb'):
524 with io.open(cp_path, 'r', encoding='utf-8') as f:
524 with io.open(cp_path, 'r', encoding='utf-8') as f:
525 current.read(f, u'json')
525 current.read(f, u'json')
526 self._copy(cp_path, nb_path)
526 self._copy(cp_path, nb_path)
527 self.log.debug("copying %s -> %s", cp_path, nb_path)
527 self.log.debug("copying %s -> %s", cp_path, nb_path)
528
528
529 def delete_checkpoint(self, checkpoint_id, name, path=''):
529 def delete_checkpoint(self, checkpoint_id, name, path=''):
530 """delete a file's checkpoint"""
530 """delete a file's checkpoint"""
531 path = path.strip('/')
531 path = path.strip('/')
532 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
532 cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
533 if not os.path.isfile(cp_path):
533 if not os.path.isfile(cp_path):
534 raise web.HTTPError(404,
534 raise web.HTTPError(404,
535 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
535 u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
536 )
536 )
537 self.log.debug("unlinking %s", cp_path)
537 self.log.debug("unlinking %s", cp_path)
538 os.unlink(cp_path)
538 os.unlink(cp_path)
539
539
540 def info_string(self):
540 def info_string(self):
541 return "Serving notebooks from local directory: %s" % self.root_dir
541 return "Serving notebooks from local directory: %s" % self.root_dir
542
542
543 def get_kernel_path(self, name, path='', model=None):
543 def get_kernel_path(self, name, path='', model=None):
544 """Return the initial working dir a kernel associated with a given notebook"""
544 """Return the initial working dir a kernel associated with a given notebook"""
545 return os.path.join(self.root_dir, path)
545 return os.path.join(self.root_dir, path)
@@ -1,480 +1,480 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Test the contents webservice API."""
2 """Test the contents webservice API."""
3
3
4 import base64
4 import base64
5 import io
5 import io
6 import json
6 import json
7 import os
7 import os
8 import shutil
8 import shutil
9 from unicodedata import normalize
9 from unicodedata import normalize
10
10
11 pjoin = os.path.join
11 pjoin = os.path.join
12
12
13 import requests
13 import requests
14
14
15 from IPython.html.utils import url_path_join, url_escape
15 from IPython.html.utils import url_path_join, url_escape
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
17 from IPython.nbformat import current
17 from IPython.nbformat import current
18 from IPython.nbformat.current import (new_notebook, write, read,
18 from IPython.nbformat.current import (new_notebook, write, read,
19 new_markdown_cell, to_notebook_json)
19 new_markdown_cell, from_dict)
20 from IPython.nbformat import v2
20 from IPython.nbformat import v2
21 from IPython.utils import py3compat
21 from IPython.utils import py3compat
22 from IPython.utils.data import uniq_stable
22 from IPython.utils.data import uniq_stable
23
23
24
24
25 def notebooks_only(dir_model):
25 def notebooks_only(dir_model):
26 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
26 return [nb for nb in dir_model['content'] if nb['type']=='notebook']
27
27
28 def dirs_only(dir_model):
28 def dirs_only(dir_model):
29 return [x for x in dir_model['content'] if x['type']=='directory']
29 return [x for x in dir_model['content'] if x['type']=='directory']
30
30
31
31
32 class API(object):
32 class API(object):
33 """Wrapper for contents API calls."""
33 """Wrapper for contents API calls."""
34 def __init__(self, base_url):
34 def __init__(self, base_url):
35 self.base_url = base_url
35 self.base_url = base_url
36
36
37 def _req(self, verb, path, body=None):
37 def _req(self, verb, path, body=None):
38 response = requests.request(verb,
38 response = requests.request(verb,
39 url_path_join(self.base_url, 'api/contents', path),
39 url_path_join(self.base_url, 'api/contents', path),
40 data=body,
40 data=body,
41 )
41 )
42 response.raise_for_status()
42 response.raise_for_status()
43 return response
43 return response
44
44
45 def list(self, path='/'):
45 def list(self, path='/'):
46 return self._req('GET', path)
46 return self._req('GET', path)
47
47
48 def read(self, name, path='/'):
48 def read(self, name, path='/'):
49 return self._req('GET', url_path_join(path, name))
49 return self._req('GET', url_path_join(path, name))
50
50
51 def create_untitled(self, path='/', ext=None):
51 def create_untitled(self, path='/', ext=None):
52 body = None
52 body = None
53 if ext:
53 if ext:
54 body = json.dumps({'ext': ext})
54 body = json.dumps({'ext': ext})
55 return self._req('POST', path, body)
55 return self._req('POST', path, body)
56
56
57 def upload_untitled(self, body, path='/'):
57 def upload_untitled(self, body, path='/'):
58 return self._req('POST', path, body)
58 return self._req('POST', path, body)
59
59
60 def copy_untitled(self, copy_from, path='/'):
60 def copy_untitled(self, copy_from, path='/'):
61 body = json.dumps({'copy_from':copy_from})
61 body = json.dumps({'copy_from':copy_from})
62 return self._req('POST', path, body)
62 return self._req('POST', path, body)
63
63
64 def create(self, name, path='/'):
64 def create(self, name, path='/'):
65 return self._req('PUT', url_path_join(path, name))
65 return self._req('PUT', url_path_join(path, name))
66
66
67 def upload(self, name, body, path='/'):
67 def upload(self, name, body, path='/'):
68 return self._req('PUT', url_path_join(path, name), body)
68 return self._req('PUT', url_path_join(path, name), body)
69
69
70 def mkdir(self, name, path='/'):
70 def mkdir(self, name, path='/'):
71 return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
71 return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
72
72
73 def copy(self, copy_from, copy_to, path='/'):
73 def copy(self, copy_from, copy_to, path='/'):
74 body = json.dumps({'copy_from':copy_from})
74 body = json.dumps({'copy_from':copy_from})
75 return self._req('PUT', url_path_join(path, copy_to), body)
75 return self._req('PUT', url_path_join(path, copy_to), body)
76
76
77 def save(self, name, body, path='/'):
77 def save(self, name, body, path='/'):
78 return self._req('PUT', url_path_join(path, name), body)
78 return self._req('PUT', url_path_join(path, name), body)
79
79
80 def delete(self, name, path='/'):
80 def delete(self, name, path='/'):
81 return self._req('DELETE', url_path_join(path, name))
81 return self._req('DELETE', url_path_join(path, name))
82
82
83 def rename(self, name, path, new_name):
83 def rename(self, name, path, new_name):
84 body = json.dumps({'name': new_name})
84 body = json.dumps({'name': new_name})
85 return self._req('PATCH', url_path_join(path, name), body)
85 return self._req('PATCH', url_path_join(path, name), body)
86
86
87 def get_checkpoints(self, name, path):
87 def get_checkpoints(self, name, path):
88 return self._req('GET', url_path_join(path, name, 'checkpoints'))
88 return self._req('GET', url_path_join(path, name, 'checkpoints'))
89
89
90 def new_checkpoint(self, name, path):
90 def new_checkpoint(self, name, path):
91 return self._req('POST', url_path_join(path, name, 'checkpoints'))
91 return self._req('POST', url_path_join(path, name, 'checkpoints'))
92
92
93 def restore_checkpoint(self, name, path, checkpoint_id):
93 def restore_checkpoint(self, name, path, checkpoint_id):
94 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
94 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
95
95
96 def delete_checkpoint(self, name, path, checkpoint_id):
96 def delete_checkpoint(self, name, path, checkpoint_id):
97 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
97 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
98
98
99 class APITest(NotebookTestBase):
99 class APITest(NotebookTestBase):
100 """Test the kernels web service API"""
100 """Test the kernels web service API"""
101 dirs_nbs = [('', 'inroot'),
101 dirs_nbs = [('', 'inroot'),
102 ('Directory with spaces in', 'inspace'),
102 ('Directory with spaces in', 'inspace'),
103 (u'unicodΓ©', 'innonascii'),
103 (u'unicodΓ©', 'innonascii'),
104 ('foo', 'a'),
104 ('foo', 'a'),
105 ('foo', 'b'),
105 ('foo', 'b'),
106 ('foo', 'name with spaces'),
106 ('foo', 'name with spaces'),
107 ('foo', u'unicodΓ©'),
107 ('foo', u'unicodΓ©'),
108 ('foo/bar', 'baz'),
108 ('foo/bar', 'baz'),
109 ('ordering', 'A'),
109 ('ordering', 'A'),
110 ('ordering', 'b'),
110 ('ordering', 'b'),
111 ('ordering', 'C'),
111 ('ordering', 'C'),
112 (u'Γ₯ b', u'Γ§ d'),
112 (u'Γ₯ b', u'Γ§ d'),
113 ]
113 ]
114 hidden_dirs = ['.hidden', '__pycache__']
114 hidden_dirs = ['.hidden', '__pycache__']
115
115
116 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
116 dirs = uniq_stable([py3compat.cast_unicode(d) for (d,n) in dirs_nbs])
117 del dirs[0] # remove ''
117 del dirs[0] # remove ''
118 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
118 top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
119
119
120 @staticmethod
120 @staticmethod
121 def _blob_for_name(name):
121 def _blob_for_name(name):
122 return name.encode('utf-8') + b'\xFF'
122 return name.encode('utf-8') + b'\xFF'
123
123
124 @staticmethod
124 @staticmethod
125 def _txt_for_name(name):
125 def _txt_for_name(name):
126 return u'%s text file' % name
126 return u'%s text file' % name
127
127
128 def setUp(self):
128 def setUp(self):
129 nbdir = self.notebook_dir.name
129 nbdir = self.notebook_dir.name
130 self.blob = os.urandom(100)
130 self.blob = os.urandom(100)
131 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
131 self.b64_blob = base64.encodestring(self.blob).decode('ascii')
132
132
133
133
134
134
135 for d in (self.dirs + self.hidden_dirs):
135 for d in (self.dirs + self.hidden_dirs):
136 d.replace('/', os.sep)
136 d.replace('/', os.sep)
137 if not os.path.isdir(pjoin(nbdir, d)):
137 if not os.path.isdir(pjoin(nbdir, d)):
138 os.mkdir(pjoin(nbdir, d))
138 os.mkdir(pjoin(nbdir, d))
139
139
140 for d, name in self.dirs_nbs:
140 for d, name in self.dirs_nbs:
141 d = d.replace('/', os.sep)
141 d = d.replace('/', os.sep)
142 # create a notebook
142 # create a notebook
143 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
143 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
144 encoding='utf-8') as f:
144 encoding='utf-8') as f:
145 nb = new_notebook()
145 nb = new_notebook()
146 write(nb, f, format='ipynb')
146 write(nb, f, format='ipynb')
147
147
148 # create a text file
148 # create a text file
149 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
149 with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
150 encoding='utf-8') as f:
150 encoding='utf-8') as f:
151 f.write(self._txt_for_name(name))
151 f.write(self._txt_for_name(name))
152
152
153 # create a binary file
153 # create a binary file
154 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
154 with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
155 f.write(self._blob_for_name(name))
155 f.write(self._blob_for_name(name))
156
156
157 self.api = API(self.base_url())
157 self.api = API(self.base_url())
158
158
159 def tearDown(self):
159 def tearDown(self):
160 nbdir = self.notebook_dir.name
160 nbdir = self.notebook_dir.name
161
161
162 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
162 for dname in (list(self.top_level_dirs) + self.hidden_dirs):
163 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
163 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
164
164
165 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
165 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
166 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
166 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
167
167
168 def test_list_notebooks(self):
168 def test_list_notebooks(self):
169 nbs = notebooks_only(self.api.list().json())
169 nbs = notebooks_only(self.api.list().json())
170 self.assertEqual(len(nbs), 1)
170 self.assertEqual(len(nbs), 1)
171 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
171 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
172
172
173 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
173 nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
174 self.assertEqual(len(nbs), 1)
174 self.assertEqual(len(nbs), 1)
175 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
175 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
176
176
177 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
177 nbs = notebooks_only(self.api.list(u'/unicodΓ©/').json())
178 self.assertEqual(len(nbs), 1)
178 self.assertEqual(len(nbs), 1)
179 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
179 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
180 self.assertEqual(nbs[0]['path'], u'unicodΓ©')
180 self.assertEqual(nbs[0]['path'], u'unicodΓ©')
181
181
182 nbs = notebooks_only(self.api.list('/foo/bar/').json())
182 nbs = notebooks_only(self.api.list('/foo/bar/').json())
183 self.assertEqual(len(nbs), 1)
183 self.assertEqual(len(nbs), 1)
184 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
184 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
185 self.assertEqual(nbs[0]['path'], 'foo/bar')
185 self.assertEqual(nbs[0]['path'], 'foo/bar')
186
186
187 nbs = notebooks_only(self.api.list('foo').json())
187 nbs = notebooks_only(self.api.list('foo').json())
188 self.assertEqual(len(nbs), 4)
188 self.assertEqual(len(nbs), 4)
189 nbnames = { normalize('NFC', n['name']) for n in nbs }
189 nbnames = { normalize('NFC', n['name']) for n in nbs }
190 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
190 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
191 expected = { normalize('NFC', name) for name in expected }
191 expected = { normalize('NFC', name) for name in expected }
192 self.assertEqual(nbnames, expected)
192 self.assertEqual(nbnames, expected)
193
193
194 nbs = notebooks_only(self.api.list('ordering').json())
194 nbs = notebooks_only(self.api.list('ordering').json())
195 nbnames = [n['name'] for n in nbs]
195 nbnames = [n['name'] for n in nbs]
196 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
196 expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
197 self.assertEqual(nbnames, expected)
197 self.assertEqual(nbnames, expected)
198
198
199 def test_list_dirs(self):
199 def test_list_dirs(self):
200 dirs = dirs_only(self.api.list().json())
200 dirs = dirs_only(self.api.list().json())
201 dir_names = {normalize('NFC', d['name']) for d in dirs}
201 dir_names = {normalize('NFC', d['name']) for d in dirs}
202 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
202 self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
203
203
204 def test_list_nonexistant_dir(self):
204 def test_list_nonexistant_dir(self):
205 with assert_http_error(404):
205 with assert_http_error(404):
206 self.api.list('nonexistant')
206 self.api.list('nonexistant')
207
207
208 def test_get_nb_contents(self):
208 def test_get_nb_contents(self):
209 for d, name in self.dirs_nbs:
209 for d, name in self.dirs_nbs:
210 nb = self.api.read('%s.ipynb' % name, d+'/').json()
210 nb = self.api.read('%s.ipynb' % name, d+'/').json()
211 self.assertEqual(nb['name'], u'%s.ipynb' % name)
211 self.assertEqual(nb['name'], u'%s.ipynb' % name)
212 self.assertEqual(nb['type'], 'notebook')
212 self.assertEqual(nb['type'], 'notebook')
213 self.assertIn('content', nb)
213 self.assertIn('content', nb)
214 self.assertEqual(nb['format'], 'json')
214 self.assertEqual(nb['format'], 'json')
215 self.assertIn('content', nb)
215 self.assertIn('content', nb)
216 self.assertIn('metadata', nb['content'])
216 self.assertIn('metadata', nb['content'])
217 self.assertIsInstance(nb['content']['metadata'], dict)
217 self.assertIsInstance(nb['content']['metadata'], dict)
218
218
219 def test_get_contents_no_such_file(self):
219 def test_get_contents_no_such_file(self):
220 # Name that doesn't exist - should be a 404
220 # Name that doesn't exist - should be a 404
221 with assert_http_error(404):
221 with assert_http_error(404):
222 self.api.read('q.ipynb', 'foo')
222 self.api.read('q.ipynb', 'foo')
223
223
224 def test_get_text_file_contents(self):
224 def test_get_text_file_contents(self):
225 for d, name in self.dirs_nbs:
225 for d, name in self.dirs_nbs:
226 model = self.api.read(u'%s.txt' % name, d+'/').json()
226 model = self.api.read(u'%s.txt' % name, d+'/').json()
227 self.assertEqual(model['name'], u'%s.txt' % name)
227 self.assertEqual(model['name'], u'%s.txt' % name)
228 self.assertIn('content', model)
228 self.assertIn('content', model)
229 self.assertEqual(model['format'], 'text')
229 self.assertEqual(model['format'], 'text')
230 self.assertEqual(model['type'], 'file')
230 self.assertEqual(model['type'], 'file')
231 self.assertEqual(model['content'], self._txt_for_name(name))
231 self.assertEqual(model['content'], self._txt_for_name(name))
232
232
233 # Name that doesn't exist - should be a 404
233 # Name that doesn't exist - should be a 404
234 with assert_http_error(404):
234 with assert_http_error(404):
235 self.api.read('q.txt', 'foo')
235 self.api.read('q.txt', 'foo')
236
236
237 def test_get_binary_file_contents(self):
237 def test_get_binary_file_contents(self):
238 for d, name in self.dirs_nbs:
238 for d, name in self.dirs_nbs:
239 model = self.api.read(u'%s.blob' % name, d+'/').json()
239 model = self.api.read(u'%s.blob' % name, d+'/').json()
240 self.assertEqual(model['name'], u'%s.blob' % name)
240 self.assertEqual(model['name'], u'%s.blob' % name)
241 self.assertIn('content', model)
241 self.assertIn('content', model)
242 self.assertEqual(model['format'], 'base64')
242 self.assertEqual(model['format'], 'base64')
243 self.assertEqual(model['type'], 'file')
243 self.assertEqual(model['type'], 'file')
244 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
244 b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
245 self.assertEqual(model['content'], b64_data)
245 self.assertEqual(model['content'], b64_data)
246
246
247 # Name that doesn't exist - should be a 404
247 # Name that doesn't exist - should be a 404
248 with assert_http_error(404):
248 with assert_http_error(404):
249 self.api.read('q.txt', 'foo')
249 self.api.read('q.txt', 'foo')
250
250
251 def _check_created(self, resp, name, path, type='notebook'):
251 def _check_created(self, resp, name, path, type='notebook'):
252 self.assertEqual(resp.status_code, 201)
252 self.assertEqual(resp.status_code, 201)
253 location_header = py3compat.str_to_unicode(resp.headers['Location'])
253 location_header = py3compat.str_to_unicode(resp.headers['Location'])
254 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
254 self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
255 rjson = resp.json()
255 rjson = resp.json()
256 self.assertEqual(rjson['name'], name)
256 self.assertEqual(rjson['name'], name)
257 self.assertEqual(rjson['path'], path)
257 self.assertEqual(rjson['path'], path)
258 self.assertEqual(rjson['type'], type)
258 self.assertEqual(rjson['type'], type)
259 isright = os.path.isdir if type == 'directory' else os.path.isfile
259 isright = os.path.isdir if type == 'directory' else os.path.isfile
260 assert isright(pjoin(
260 assert isright(pjoin(
261 self.notebook_dir.name,
261 self.notebook_dir.name,
262 path.replace('/', os.sep),
262 path.replace('/', os.sep),
263 name,
263 name,
264 ))
264 ))
265
265
266 def test_create_untitled(self):
266 def test_create_untitled(self):
267 resp = self.api.create_untitled(path=u'Γ₯ b')
267 resp = self.api.create_untitled(path=u'Γ₯ b')
268 self._check_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
268 self._check_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
269
269
270 # Second time
270 # Second time
271 resp = self.api.create_untitled(path=u'Γ₯ b')
271 resp = self.api.create_untitled(path=u'Γ₯ b')
272 self._check_created(resp, 'Untitled1.ipynb', u'Γ₯ b')
272 self._check_created(resp, 'Untitled1.ipynb', u'Γ₯ b')
273
273
274 # And two directories down
274 # And two directories down
275 resp = self.api.create_untitled(path='foo/bar')
275 resp = self.api.create_untitled(path='foo/bar')
276 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
276 self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
277
277
278 def test_create_untitled_txt(self):
278 def test_create_untitled_txt(self):
279 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
279 resp = self.api.create_untitled(path='foo/bar', ext='.txt')
280 self._check_created(resp, 'untitled0.txt', 'foo/bar', type='file')
280 self._check_created(resp, 'untitled0.txt', 'foo/bar', type='file')
281
281
282 resp = self.api.read(path='foo/bar', name='untitled0.txt')
282 resp = self.api.read(path='foo/bar', name='untitled0.txt')
283 model = resp.json()
283 model = resp.json()
284 self.assertEqual(model['type'], 'file')
284 self.assertEqual(model['type'], 'file')
285 self.assertEqual(model['format'], 'text')
285 self.assertEqual(model['format'], 'text')
286 self.assertEqual(model['content'], '')
286 self.assertEqual(model['content'], '')
287
287
288 def test_upload_untitled(self):
288 def test_upload_untitled(self):
289 nb = new_notebook()
289 nb = new_notebook()
290 nbmodel = {'content': nb, 'type': 'notebook'}
290 nbmodel = {'content': nb, 'type': 'notebook'}
291 resp = self.api.upload_untitled(path=u'Γ₯ b',
291 resp = self.api.upload_untitled(path=u'Γ₯ b',
292 body=json.dumps(nbmodel))
292 body=json.dumps(nbmodel))
293 self._check_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
293 self._check_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
294
294
295 def test_upload(self):
295 def test_upload(self):
296 nb = new_notebook()
296 nb = new_notebook()
297 nbmodel = {'content': nb, 'type': 'notebook'}
297 nbmodel = {'content': nb, 'type': 'notebook'}
298 resp = self.api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
298 resp = self.api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
299 body=json.dumps(nbmodel))
299 body=json.dumps(nbmodel))
300 self._check_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
300 self._check_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
301
301
302 def test_mkdir(self):
302 def test_mkdir(self):
303 resp = self.api.mkdir(u'New βˆ‚ir', path=u'Γ₯ b')
303 resp = self.api.mkdir(u'New βˆ‚ir', path=u'Γ₯ b')
304 self._check_created(resp, u'New βˆ‚ir', u'Γ₯ b', type='directory')
304 self._check_created(resp, u'New βˆ‚ir', u'Γ₯ b', type='directory')
305
305
306 def test_mkdir_hidden_400(self):
306 def test_mkdir_hidden_400(self):
307 with assert_http_error(400):
307 with assert_http_error(400):
308 resp = self.api.mkdir(u'.hidden', path=u'Γ₯ b')
308 resp = self.api.mkdir(u'.hidden', path=u'Γ₯ b')
309
309
310 def test_upload_txt(self):
310 def test_upload_txt(self):
311 body = u'ΓΌnicode tΓ©xt'
311 body = u'ΓΌnicode tΓ©xt'
312 model = {
312 model = {
313 'content' : body,
313 'content' : body,
314 'format' : 'text',
314 'format' : 'text',
315 'type' : 'file',
315 'type' : 'file',
316 }
316 }
317 resp = self.api.upload(u'Upload tΓ©st.txt', path=u'Γ₯ b',
317 resp = self.api.upload(u'Upload tΓ©st.txt', path=u'Γ₯ b',
318 body=json.dumps(model))
318 body=json.dumps(model))
319
319
320 # check roundtrip
320 # check roundtrip
321 resp = self.api.read(path=u'Γ₯ b', name=u'Upload tΓ©st.txt')
321 resp = self.api.read(path=u'Γ₯ b', name=u'Upload tΓ©st.txt')
322 model = resp.json()
322 model = resp.json()
323 self.assertEqual(model['type'], 'file')
323 self.assertEqual(model['type'], 'file')
324 self.assertEqual(model['format'], 'text')
324 self.assertEqual(model['format'], 'text')
325 self.assertEqual(model['content'], body)
325 self.assertEqual(model['content'], body)
326
326
327 def test_upload_b64(self):
327 def test_upload_b64(self):
328 body = b'\xFFblob'
328 body = b'\xFFblob'
329 b64body = base64.encodestring(body).decode('ascii')
329 b64body = base64.encodestring(body).decode('ascii')
330 model = {
330 model = {
331 'content' : b64body,
331 'content' : b64body,
332 'format' : 'base64',
332 'format' : 'base64',
333 'type' : 'file',
333 'type' : 'file',
334 }
334 }
335 resp = self.api.upload(u'Upload tΓ©st.blob', path=u'Γ₯ b',
335 resp = self.api.upload(u'Upload tΓ©st.blob', path=u'Γ₯ b',
336 body=json.dumps(model))
336 body=json.dumps(model))
337
337
338 # check roundtrip
338 # check roundtrip
339 resp = self.api.read(path=u'Γ₯ b', name=u'Upload tΓ©st.blob')
339 resp = self.api.read(path=u'Γ₯ b', name=u'Upload tΓ©st.blob')
340 model = resp.json()
340 model = resp.json()
341 self.assertEqual(model['type'], 'file')
341 self.assertEqual(model['type'], 'file')
342 self.assertEqual(model['format'], 'base64')
342 self.assertEqual(model['format'], 'base64')
343 decoded = base64.decodestring(model['content'].encode('ascii'))
343 decoded = base64.decodestring(model['content'].encode('ascii'))
344 self.assertEqual(decoded, body)
344 self.assertEqual(decoded, body)
345
345
346 def test_upload_v2(self):
346 def test_upload_v2(self):
347 nb = v2.new_notebook()
347 nb = v2.new_notebook()
348 ws = v2.new_worksheet()
348 ws = v2.new_worksheet()
349 nb.worksheets.append(ws)
349 nb.worksheets.append(ws)
350 ws.cells.append(v2.new_code_cell(input='print("hi")'))
350 ws.cells.append(v2.new_code_cell(input='print("hi")'))
351 nbmodel = {'content': nb, 'type': 'notebook'}
351 nbmodel = {'content': nb, 'type': 'notebook'}
352 resp = self.api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
352 resp = self.api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
353 body=json.dumps(nbmodel))
353 body=json.dumps(nbmodel))
354 self._check_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
354 self._check_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
355 resp = self.api.read(u'Upload tΓ©st.ipynb', u'Γ₯ b')
355 resp = self.api.read(u'Upload tΓ©st.ipynb', u'Γ₯ b')
356 data = resp.json()
356 data = resp.json()
357 self.assertEqual(data['content']['nbformat'], current.nbformat)
357 self.assertEqual(data['content']['nbformat'], current.nbformat)
358
358
359 def test_copy_untitled(self):
359 def test_copy_untitled(self):
360 resp = self.api.copy_untitled(u'Γ§ d.ipynb', path=u'Γ₯ b')
360 resp = self.api.copy_untitled(u'Γ§ d.ipynb', path=u'Γ₯ b')
361 self._check_created(resp, u'Γ§ d-Copy0.ipynb', u'Γ₯ b')
361 self._check_created(resp, u'Γ§ d-Copy0.ipynb', u'Γ₯ b')
362
362
363 def test_copy(self):
363 def test_copy(self):
364 resp = self.api.copy(u'Γ§ d.ipynb', u'cΓΈpy.ipynb', path=u'Γ₯ b')
364 resp = self.api.copy(u'Γ§ d.ipynb', u'cΓΈpy.ipynb', path=u'Γ₯ b')
365 self._check_created(resp, u'cΓΈpy.ipynb', u'Γ₯ b')
365 self._check_created(resp, u'cΓΈpy.ipynb', u'Γ₯ b')
366
366
367 def test_copy_path(self):
367 def test_copy_path(self):
368 resp = self.api.copy(u'foo/a.ipynb', u'cΓΈpyfoo.ipynb', path=u'Γ₯ b')
368 resp = self.api.copy(u'foo/a.ipynb', u'cΓΈpyfoo.ipynb', path=u'Γ₯ b')
369 self._check_created(resp, u'cΓΈpyfoo.ipynb', u'Γ₯ b')
369 self._check_created(resp, u'cΓΈpyfoo.ipynb', u'Γ₯ b')
370
370
371 def test_copy_dir_400(self):
371 def test_copy_dir_400(self):
372 # can't copy directories
372 # can't copy directories
373 with assert_http_error(400):
373 with assert_http_error(400):
374 resp = self.api.copy(u'Γ₯ b', u'Γ₯ c')
374 resp = self.api.copy(u'Γ₯ b', u'Γ₯ c')
375
375
376 def test_delete(self):
376 def test_delete(self):
377 for d, name in self.dirs_nbs:
377 for d, name in self.dirs_nbs:
378 resp = self.api.delete('%s.ipynb' % name, d)
378 resp = self.api.delete('%s.ipynb' % name, d)
379 self.assertEqual(resp.status_code, 204)
379 self.assertEqual(resp.status_code, 204)
380
380
381 for d in self.dirs + ['/']:
381 for d in self.dirs + ['/']:
382 nbs = notebooks_only(self.api.list(d).json())
382 nbs = notebooks_only(self.api.list(d).json())
383 self.assertEqual(len(nbs), 0)
383 self.assertEqual(len(nbs), 0)
384
384
385 def test_delete_dirs(self):
385 def test_delete_dirs(self):
386 # depth-first delete everything, so we don't try to delete empty directories
386 # depth-first delete everything, so we don't try to delete empty directories
387 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
387 for name in sorted(self.dirs + ['/'], key=len, reverse=True):
388 listing = self.api.list(name).json()['content']
388 listing = self.api.list(name).json()['content']
389 for model in listing:
389 for model in listing:
390 self.api.delete(model['name'], model['path'])
390 self.api.delete(model['name'], model['path'])
391 listing = self.api.list('/').json()['content']
391 listing = self.api.list('/').json()['content']
392 self.assertEqual(listing, [])
392 self.assertEqual(listing, [])
393
393
394 def test_delete_non_empty_dir(self):
394 def test_delete_non_empty_dir(self):
395 """delete non-empty dir raises 400"""
395 """delete non-empty dir raises 400"""
396 with assert_http_error(400):
396 with assert_http_error(400):
397 self.api.delete(u'Γ₯ b')
397 self.api.delete(u'Γ₯ b')
398
398
399 def test_rename(self):
399 def test_rename(self):
400 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
400 resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
401 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
401 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
402 self.assertEqual(resp.json()['name'], 'z.ipynb')
402 self.assertEqual(resp.json()['name'], 'z.ipynb')
403 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
403 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
404
404
405 nbs = notebooks_only(self.api.list('foo').json())
405 nbs = notebooks_only(self.api.list('foo').json())
406 nbnames = set(n['name'] for n in nbs)
406 nbnames = set(n['name'] for n in nbs)
407 self.assertIn('z.ipynb', nbnames)
407 self.assertIn('z.ipynb', nbnames)
408 self.assertNotIn('a.ipynb', nbnames)
408 self.assertNotIn('a.ipynb', nbnames)
409
409
410 def test_rename_existing(self):
410 def test_rename_existing(self):
411 with assert_http_error(409):
411 with assert_http_error(409):
412 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
412 self.api.rename('a.ipynb', 'foo', 'b.ipynb')
413
413
414 def test_save(self):
414 def test_save(self):
415 resp = self.api.read('a.ipynb', 'foo')
415 resp = self.api.read('a.ipynb', 'foo')
416 nbcontent = json.loads(resp.text)['content']
416 nbcontent = json.loads(resp.text)['content']
417 nb = to_notebook_json(nbcontent)
417 nb = from_dict(nbcontent)
418 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
418 nb.cells.append(new_markdown_cell(u'Created by test Β³'))
419
419
420 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
420 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
421 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
421 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
422
422
423 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
423 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
424 with io.open(nbfile, 'r', encoding='utf-8') as f:
424 with io.open(nbfile, 'r', encoding='utf-8') as f:
425 newnb = read(f, format='ipynb')
425 newnb = read(f, format='ipynb')
426 self.assertEqual(newnb.cells[0].source,
426 self.assertEqual(newnb.cells[0].source,
427 u'Created by test Β³')
427 u'Created by test Β³')
428 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
428 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
429 newnb = to_notebook_json(nbcontent)
429 newnb = from_dict(nbcontent)
430 self.assertEqual(newnb.cells[0].source,
430 self.assertEqual(newnb.cells[0].source,
431 u'Created by test Β³')
431 u'Created by test Β³')
432
432
433 # Save and rename
433 # Save and rename
434 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
434 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
435 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
435 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
436 saved = resp.json()
436 saved = resp.json()
437 self.assertEqual(saved['name'], 'a2.ipynb')
437 self.assertEqual(saved['name'], 'a2.ipynb')
438 self.assertEqual(saved['path'], 'foo/bar')
438 self.assertEqual(saved['path'], 'foo/bar')
439 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
439 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
440 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
440 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
441 with assert_http_error(404):
441 with assert_http_error(404):
442 self.api.read('a.ipynb', 'foo')
442 self.api.read('a.ipynb', 'foo')
443
443
444 def test_checkpoints(self):
444 def test_checkpoints(self):
445 resp = self.api.read('a.ipynb', 'foo')
445 resp = self.api.read('a.ipynb', 'foo')
446 r = self.api.new_checkpoint('a.ipynb', 'foo')
446 r = self.api.new_checkpoint('a.ipynb', 'foo')
447 self.assertEqual(r.status_code, 201)
447 self.assertEqual(r.status_code, 201)
448 cp1 = r.json()
448 cp1 = r.json()
449 self.assertEqual(set(cp1), {'id', 'last_modified'})
449 self.assertEqual(set(cp1), {'id', 'last_modified'})
450 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
450 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
451
451
452 # Modify it
452 # Modify it
453 nbcontent = json.loads(resp.text)['content']
453 nbcontent = json.loads(resp.text)['content']
454 nb = to_notebook_json(nbcontent)
454 nb = from_dict(nbcontent)
455 hcell = new_markdown_cell('Created by test')
455 hcell = new_markdown_cell('Created by test')
456 nb.cells.append(hcell)
456 nb.cells.append(hcell)
457 # Save
457 # Save
458 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
458 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
459 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
459 resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
460
460
461 # List checkpoints
461 # List checkpoints
462 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
462 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
463 self.assertEqual(cps, [cp1])
463 self.assertEqual(cps, [cp1])
464
464
465 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
465 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
466 nb = to_notebook_json(nbcontent)
466 nb = from_dict(nbcontent)
467 self.assertEqual(nb.cells[0].source, 'Created by test')
467 self.assertEqual(nb.cells[0].source, 'Created by test')
468
468
469 # Restore cp1
469 # Restore cp1
470 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
470 r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
471 self.assertEqual(r.status_code, 204)
471 self.assertEqual(r.status_code, 204)
472 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
472 nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
473 nb = to_notebook_json(nbcontent)
473 nb = from_dict(nbcontent)
474 self.assertEqual(nb.cells, [])
474 self.assertEqual(nb.cells, [])
475
475
476 # Delete cp1
476 # Delete cp1
477 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
477 r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
478 self.assertEqual(r.status_code, 204)
478 self.assertEqual(r.status_code, 204)
479 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
479 cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
480 self.assertEqual(cps, [])
480 self.assertEqual(cps, [])
General Comments 0
You need to be logged in to leave comments. Login now