##// END OF EJS Templates
s/os.path.sep/os.sep/
MinRK -
Show More
@@ -1,529 +1,529 b''
1 1 """Base Tornado handlers for the notebook.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19
20 20 import datetime
21 21 import email.utils
22 22 import functools
23 23 import hashlib
24 24 import json
25 25 import logging
26 26 import mimetypes
27 27 import os
28 28 import stat
29 29 import sys
30 30 import threading
31 31 import traceback
32 32
33 33 from tornado import web
34 34 from tornado import websocket
35 35
36 36 try:
37 37 from tornado.log import app_log
38 38 except ImportError:
39 39 app_log = logging.getLogger()
40 40
41 41 from IPython.config import Application
42 42 from IPython.external.decorator import decorator
43 43 from IPython.utils.path import filefind
44 44 from IPython.utils.jsonutil import date_default
45 45
46 46 # UF_HIDDEN is a stat flag not defined in the stat module.
47 47 # It is used by BSD to indicate hidden files.
48 48 UF_HIDDEN = getattr(stat, 'UF_HIDDEN', 32768)
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Monkeypatch for Tornado <= 2.1.1 - Remove when no longer necessary!
52 52 #-----------------------------------------------------------------------------
53 53
54 54 # Google Chrome, as of release 16, changed its websocket protocol number. The
55 55 # parts tornado cares about haven't really changed, so it's OK to continue
56 56 # accepting Chrome connections, but as of Tornado 2.1.1 (the currently released
57 57 # version as of Oct 30/2011) the version check fails, see the issue report:
58 58
59 59 # https://github.com/facebook/tornado/issues/385
60 60
61 61 # This issue has been fixed in Tornado post 2.1.1:
62 62
63 63 # https://github.com/facebook/tornado/commit/84d7b458f956727c3b0d6710
64 64
65 65 # Here we manually apply the same patch as above so that users of IPython can
66 66 # continue to work with an officially released Tornado. We make the
67 67 # monkeypatch version check as narrow as possible to limit its effects; once
68 68 # Tornado 2.1.1 is no longer found in the wild we'll delete this code.
69 69
70 70 import tornado
71 71
72 72 if tornado.version_info <= (2,1,1):
73 73
74 74 def _execute(self, transforms, *args, **kwargs):
75 75 from tornado.websocket import WebSocketProtocol8, WebSocketProtocol76
76 76
77 77 self.open_args = args
78 78 self.open_kwargs = kwargs
79 79
80 80 # The difference between version 8 and 13 is that in 8 the
81 81 # client sends a "Sec-Websocket-Origin" header and in 13 it's
82 82 # simply "Origin".
83 83 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"):
84 84 self.ws_connection = WebSocketProtocol8(self)
85 85 self.ws_connection.accept_connection()
86 86
87 87 elif self.request.headers.get("Sec-WebSocket-Version"):
88 88 self.stream.write(tornado.escape.utf8(
89 89 "HTTP/1.1 426 Upgrade Required\r\n"
90 90 "Sec-WebSocket-Version: 8\r\n\r\n"))
91 91 self.stream.close()
92 92
93 93 else:
94 94 self.ws_connection = WebSocketProtocol76(self)
95 95 self.ws_connection.accept_connection()
96 96
97 97 websocket.WebSocketHandler._execute = _execute
98 98 del _execute
99 99
100 100
101 101 #-----------------------------------------------------------------------------
102 102 # Top-level handlers
103 103 #-----------------------------------------------------------------------------
104 104
105 105 class RequestHandler(web.RequestHandler):
106 106 """RequestHandler with default variable setting."""
107 107
108 108 def render(*args, **kwargs):
109 109 kwargs.setdefault('message', '')
110 110 return web.RequestHandler.render(*args, **kwargs)
111 111
112 112 class AuthenticatedHandler(RequestHandler):
113 113 """A RequestHandler with an authenticated user."""
114 114
115 115 def clear_login_cookie(self):
116 116 self.clear_cookie(self.cookie_name)
117 117
118 118 def get_current_user(self):
119 119 user_id = self.get_secure_cookie(self.cookie_name)
120 120 # For now the user_id should not return empty, but it could eventually
121 121 if user_id == '':
122 122 user_id = 'anonymous'
123 123 if user_id is None:
124 124 # prevent extra Invalid cookie sig warnings:
125 125 self.clear_login_cookie()
126 126 if not self.login_available:
127 127 user_id = 'anonymous'
128 128 return user_id
129 129
130 130 @property
131 131 def cookie_name(self):
132 132 default_cookie_name = 'username-{host}'.format(
133 133 host=self.request.host,
134 134 ).replace(':', '-')
135 135 return self.settings.get('cookie_name', default_cookie_name)
136 136
137 137 @property
138 138 def password(self):
139 139 """our password"""
140 140 return self.settings.get('password', '')
141 141
142 142 @property
143 143 def logged_in(self):
144 144 """Is a user currently logged in?
145 145
146 146 """
147 147 user = self.get_current_user()
148 148 return (user and not user == 'anonymous')
149 149
150 150 @property
151 151 def login_available(self):
152 152 """May a user proceed to log in?
153 153
154 154 This returns True if login capability is available, irrespective of
155 155 whether the user is already logged in or not.
156 156
157 157 """
158 158 return bool(self.settings.get('password', ''))
159 159
160 160
161 161 class IPythonHandler(AuthenticatedHandler):
162 162 """IPython-specific extensions to authenticated handling
163 163
164 164 Mostly property shortcuts to IPython-specific settings.
165 165 """
166 166
167 167 @property
168 168 def config(self):
169 169 return self.settings.get('config', None)
170 170
171 171 @property
172 172 def log(self):
173 173 """use the IPython log by default, falling back on tornado's logger"""
174 174 if Application.initialized():
175 175 return Application.instance().log
176 176 else:
177 177 return app_log
178 178
179 179 @property
180 180 def use_less(self):
181 181 """Use less instead of css in templates"""
182 182 return self.settings.get('use_less', False)
183 183
184 184 #---------------------------------------------------------------
185 185 # URLs
186 186 #---------------------------------------------------------------
187 187
188 188 @property
189 189 def ws_url(self):
190 190 """websocket url matching the current request
191 191
192 192 By default, this is just `''`, indicating that it should match
193 193 the same host, protocol, port, etc.
194 194 """
195 195 return self.settings.get('websocket_url', '')
196 196
197 197 @property
198 198 def mathjax_url(self):
199 199 return self.settings.get('mathjax_url', '')
200 200
201 201 @property
202 202 def base_project_url(self):
203 203 return self.settings.get('base_project_url', '/')
204 204
205 205 @property
206 206 def base_kernel_url(self):
207 207 return self.settings.get('base_kernel_url', '/')
208 208
209 209 #---------------------------------------------------------------
210 210 # Manager objects
211 211 #---------------------------------------------------------------
212 212
213 213 @property
214 214 def kernel_manager(self):
215 215 return self.settings['kernel_manager']
216 216
217 217 @property
218 218 def notebook_manager(self):
219 219 return self.settings['notebook_manager']
220 220
221 221 @property
222 222 def cluster_manager(self):
223 223 return self.settings['cluster_manager']
224 224
225 225 @property
226 226 def session_manager(self):
227 227 return self.settings['session_manager']
228 228
229 229 @property
230 230 def project_dir(self):
231 231 return self.notebook_manager.notebook_dir
232 232
233 233 #---------------------------------------------------------------
234 234 # template rendering
235 235 #---------------------------------------------------------------
236 236
237 237 def get_template(self, name):
238 238 """Return the jinja template object for a given name"""
239 239 return self.settings['jinja2_env'].get_template(name)
240 240
241 241 def render_template(self, name, **ns):
242 242 ns.update(self.template_namespace)
243 243 template = self.get_template(name)
244 244 return template.render(**ns)
245 245
246 246 @property
247 247 def template_namespace(self):
248 248 return dict(
249 249 base_project_url=self.base_project_url,
250 250 base_kernel_url=self.base_kernel_url,
251 251 logged_in=self.logged_in,
252 252 login_available=self.login_available,
253 253 use_less=self.use_less,
254 254 )
255 255
256 256 def get_json_body(self):
257 257 """Return the body of the request as JSON data."""
258 258 if not self.request.body:
259 259 return None
260 260 # Do we need to call body.decode('utf-8') here?
261 261 body = self.request.body.strip().decode(u'utf-8')
262 262 try:
263 263 model = json.loads(body)
264 264 except Exception:
265 265 self.log.debug("Bad JSON: %r", body)
266 266 self.log.error("Couldn't parse JSON", exc_info=True)
267 267 raise web.HTTPError(400, u'Invalid JSON in body of request')
268 268 return model
269 269
270 270
271 271 class AuthenticatedFileHandler(IPythonHandler, web.StaticFileHandler):
272 272 """static files should only be accessible when logged in"""
273 273
274 274 @web.authenticated
275 275 def get(self, path):
276 276 if os.path.splitext(path)[1] == '.ipynb':
277 277 name = os.path.basename(path)
278 278 self.set_header('Content-Type', 'application/json')
279 279 self.set_header('Content-Disposition','attachment; filename="%s"' % name)
280 280
281 281 return web.StaticFileHandler.get(self, path)
282 282
283 283 def validate_absolute_path(self, root, absolute_path):
284 284 """Validate and return the absolute path.
285 285
286 286 Requires tornado 3.1
287 287
288 288 Adding to tornado's own handling, forbids the serving of hidden files.
289 289 """
290 290 abs_path = super(AuthenticatedFileHandler, self).validate_absolute_path(root, absolute_path)
291 291 abs_root = os.path.abspath(root)
292 292 self.forbid_hidden(abs_root, abs_path)
293 293 return abs_path
294 294
295 295 def forbid_hidden(self, absolute_root, absolute_path):
296 296 """Raise 403 if a file is hidden or contained in a hidden directory.
297 297
298 298 Hidden is determined by either name starting with '.'
299 299 or the UF_HIDDEN flag as reported by stat
300 300 """
301 301 inside_root = absolute_path[len(absolute_root):]
302 if any(part.startswith('.') for part in inside_root.split(os.path.sep)):
302 if any(part.startswith('.') for part in inside_root.split(os.sep)):
303 303 raise web.HTTPError(403)
304 304
305 305 # check UF_HIDDEN on any location up to root
306 306 path = absolute_path
307 307 while path and path.startswith(absolute_root):
308 308 st = os.stat(path)
309 309 if getattr(st, 'st_flags', 0) & UF_HIDDEN:
310 310 raise web.HTTPError(403)
311 311 path, _ = os.path.split(path)
312 312
313 313 return absolute_path
314 314
315 315
316 316 def json_errors(method):
317 317 """Decorate methods with this to return GitHub style JSON errors.
318 318
319 319 This should be used on any JSON API on any handler method that can raise HTTPErrors.
320 320
321 321 This will grab the latest HTTPError exception using sys.exc_info
322 322 and then:
323 323
324 324 1. Set the HTTP status code based on the HTTPError
325 325 2. Create and return a JSON body with a message field describing
326 326 the error in a human readable form.
327 327 """
328 328 @functools.wraps(method)
329 329 def wrapper(self, *args, **kwargs):
330 330 try:
331 331 result = method(self, *args, **kwargs)
332 332 except web.HTTPError as e:
333 333 status = e.status_code
334 334 message = e.log_message
335 335 self.set_status(e.status_code)
336 336 self.finish(json.dumps(dict(message=message)))
337 337 except Exception:
338 338 self.log.error("Unhandled error in API request", exc_info=True)
339 339 status = 500
340 340 message = "Unknown server error"
341 341 t, value, tb = sys.exc_info()
342 342 self.set_status(status)
343 343 tb_text = ''.join(traceback.format_exception(t, value, tb))
344 344 reply = dict(message=message, traceback=tb_text)
345 345 self.finish(json.dumps(reply))
346 346 else:
347 347 return result
348 348 return wrapper
349 349
350 350
351 351
352 352 #-----------------------------------------------------------------------------
353 353 # File handler
354 354 #-----------------------------------------------------------------------------
355 355
356 356 # to minimize subclass changes:
357 357 HTTPError = web.HTTPError
358 358
359 359 class FileFindHandler(web.StaticFileHandler):
360 360 """subclass of StaticFileHandler for serving files from a search path"""
361 361
362 362 _static_paths = {}
363 363 # _lock is needed for tornado < 2.2.0 compat
364 364 _lock = threading.Lock() # protects _static_hashes
365 365
366 366 def initialize(self, path, default_filename=None):
367 367 if isinstance(path, basestring):
368 368 path = [path]
369 369 self.roots = tuple(
370 os.path.abspath(os.path.expanduser(p)) + os.path.sep for p in path
370 os.path.abspath(os.path.expanduser(p)) + os.sep for p in path
371 371 )
372 372 self.default_filename = default_filename
373 373
374 374 @classmethod
375 375 def locate_file(cls, path, roots):
376 376 """locate a file to serve on our static file search path"""
377 377 with cls._lock:
378 378 if path in cls._static_paths:
379 379 return cls._static_paths[path]
380 380 try:
381 381 abspath = os.path.abspath(filefind(path, roots))
382 382 except IOError:
383 383 # empty string should always give exists=False
384 384 return ''
385 385
386 386 # os.path.abspath strips a trailing /
387 387 # it needs to be temporarily added back for requests to root/
388 if not (abspath + os.path.sep).startswith(roots):
388 if not (abspath + os.sep).startswith(roots):
389 389 raise HTTPError(403, "%s is not in root static directory", path)
390 390
391 391 cls._static_paths[path] = abspath
392 392 return abspath
393 393
394 394 def get(self, path, include_body=True):
395 395 path = self.parse_url_path(path)
396 396
397 397 # begin subclass override
398 398 abspath = self.locate_file(path, self.roots)
399 399 # end subclass override
400 400
401 401 if os.path.isdir(abspath) and self.default_filename is not None:
402 402 # need to look at the request.path here for when path is empty
403 403 # but there is some prefix to the path that was already
404 404 # trimmed by the routing
405 405 if not self.request.path.endswith("/"):
406 406 self.redirect(self.request.path + "/")
407 407 return
408 408 abspath = os.path.join(abspath, self.default_filename)
409 409 if not os.path.exists(abspath):
410 410 raise HTTPError(404)
411 411 if not os.path.isfile(abspath):
412 412 raise HTTPError(403, "%s is not a file", path)
413 413
414 414 stat_result = os.stat(abspath)
415 415 modified = datetime.datetime.utcfromtimestamp(stat_result[stat.ST_MTIME])
416 416
417 417 self.set_header("Last-Modified", modified)
418 418
419 419 mime_type, encoding = mimetypes.guess_type(abspath)
420 420 if mime_type:
421 421 self.set_header("Content-Type", mime_type)
422 422
423 423 cache_time = self.get_cache_time(path, modified, mime_type)
424 424
425 425 if cache_time > 0:
426 426 self.set_header("Expires", datetime.datetime.utcnow() + \
427 427 datetime.timedelta(seconds=cache_time))
428 428 self.set_header("Cache-Control", "max-age=" + str(cache_time))
429 429 else:
430 430 self.set_header("Cache-Control", "public")
431 431
432 432 self.set_extra_headers(path)
433 433
434 434 # Check the If-Modified-Since, and don't send the result if the
435 435 # content has not been modified
436 436 ims_value = self.request.headers.get("If-Modified-Since")
437 437 if ims_value is not None:
438 438 date_tuple = email.utils.parsedate(ims_value)
439 439 if_since = datetime.datetime(*date_tuple[:6])
440 440 if if_since >= modified:
441 441 self.set_status(304)
442 442 return
443 443
444 444 with open(abspath, "rb") as file:
445 445 data = file.read()
446 446 hasher = hashlib.sha1()
447 447 hasher.update(data)
448 448 self.set_header("Etag", '"%s"' % hasher.hexdigest())
449 449 if include_body:
450 450 self.write(data)
451 451 else:
452 452 assert self.request.method == "HEAD"
453 453 self.set_header("Content-Length", len(data))
454 454
455 455 @classmethod
456 456 def get_version(cls, settings, path):
457 457 """Generate the version string to be used in static URLs.
458 458
459 459 This method may be overridden in subclasses (but note that it
460 460 is a class method rather than a static method). The default
461 461 implementation uses a hash of the file's contents.
462 462
463 463 ``settings`` is the `Application.settings` dictionary and ``path``
464 464 is the relative location of the requested asset on the filesystem.
465 465 The returned value should be a string, or ``None`` if no version
466 466 could be determined.
467 467 """
468 468 # begin subclass override:
469 469 static_paths = settings['static_path']
470 470 if isinstance(static_paths, basestring):
471 471 static_paths = [static_paths]
472 472 roots = tuple(
473 os.path.abspath(os.path.expanduser(p)) + os.path.sep for p in static_paths
473 os.path.abspath(os.path.expanduser(p)) + os.sep for p in static_paths
474 474 )
475 475
476 476 try:
477 477 abs_path = filefind(path, roots)
478 478 except IOError:
479 479 app_log.error("Could not find static file %r", path)
480 480 return None
481 481
482 482 # end subclass override
483 483
484 484 with cls._lock:
485 485 hashes = cls._static_hashes
486 486 if abs_path not in hashes:
487 487 try:
488 488 f = open(abs_path, "rb")
489 489 hashes[abs_path] = hashlib.md5(f.read()).hexdigest()
490 490 f.close()
491 491 except Exception:
492 492 app_log.error("Could not open static file %r", path)
493 493 hashes[abs_path] = None
494 494 hsh = hashes.get(abs_path)
495 495 if hsh:
496 496 return hsh[:5]
497 497 return None
498 498
499 499
500 500 def parse_url_path(self, url_path):
501 501 """Converts a static URL path into a filesystem path.
502 502
503 503 ``url_path`` is the path component of the URL with
504 504 ``static_url_prefix`` removed. The return value should be
505 505 filesystem path relative to ``static_path``.
506 506 """
507 if os.path.sep != "/":
508 url_path = url_path.replace("/", os.path.sep)
507 if os.sep != "/":
508 url_path = url_path.replace("/", os.sep)
509 509 return url_path
510 510
511 511 class TrailingSlashHandler(web.RequestHandler):
512 512 """Simple redirect handler that strips trailing slashes
513 513
514 514 This should be the first, highest priority handler.
515 515 """
516 516
517 517 SUPPORTED_METHODS = ['GET']
518 518
519 519 def get(self):
520 520 self.redirect(self.request.uri.rstrip('/'))
521 521
522 522 #-----------------------------------------------------------------------------
523 523 # URL to handler mappings
524 524 #-----------------------------------------------------------------------------
525 525
526 526
527 527 default_handlers = [
528 528 (r".*/", TrailingSlashHandler)
529 529 ]
@@ -1,318 +1,318 b''
1 1 # coding: utf-8
2 2 """Test the notebooks webservice API."""
3 3
4 4 import io
5 5 import json
6 6 import os
7 7 import shutil
8 8 from unicodedata import normalize
9 9
10 10 pjoin = os.path.join
11 11
12 12 import requests
13 13
14 14 from IPython.html.utils import url_path_join, url_escape
15 15 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 16 from IPython.nbformat import current
17 17 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
18 18 new_heading_cell, to_notebook_json)
19 19 from IPython.nbformat import v2
20 20 from IPython.utils import py3compat
21 21 from IPython.utils.data import uniq_stable
22 22
23 23
24 24 class NBAPI(object):
25 25 """Wrapper for notebook API calls."""
26 26 def __init__(self, base_url):
27 27 self.base_url = base_url
28 28
29 29 def _req(self, verb, path, body=None):
30 30 response = requests.request(verb,
31 31 url_path_join(self.base_url, 'api/notebooks', path),
32 32 data=body,
33 33 )
34 34 response.raise_for_status()
35 35 return response
36 36
37 37 def list(self, path='/'):
38 38 return self._req('GET', path)
39 39
40 40 def read(self, name, path='/'):
41 41 return self._req('GET', url_path_join(path, name))
42 42
43 43 def create_untitled(self, path='/'):
44 44 return self._req('POST', path)
45 45
46 46 def upload_untitled(self, body, path='/'):
47 47 return self._req('POST', path, body)
48 48
49 49 def copy_untitled(self, copy_from, path='/'):
50 50 body = json.dumps({'copy_from':copy_from})
51 51 return self._req('POST', path, body)
52 52
53 53 def create(self, name, path='/'):
54 54 return self._req('PUT', url_path_join(path, name))
55 55
56 56 def upload(self, name, body, path='/'):
57 57 return self._req('PUT', url_path_join(path, name), body)
58 58
59 59 def copy(self, copy_from, copy_to, path='/'):
60 60 body = json.dumps({'copy_from':copy_from})
61 61 return self._req('PUT', url_path_join(path, copy_to), body)
62 62
63 63 def save(self, name, body, path='/'):
64 64 return self._req('PUT', url_path_join(path, name), body)
65 65
66 66 def delete(self, name, path='/'):
67 67 return self._req('DELETE', url_path_join(path, name))
68 68
69 69 def rename(self, name, path, new_name):
70 70 body = json.dumps({'name': new_name})
71 71 return self._req('PATCH', url_path_join(path, name), body)
72 72
73 73 def get_checkpoints(self, name, path):
74 74 return self._req('GET', url_path_join(path, name, 'checkpoints'))
75 75
76 76 def new_checkpoint(self, name, path):
77 77 return self._req('POST', url_path_join(path, name, 'checkpoints'))
78 78
79 79 def restore_checkpoint(self, name, path, checkpoint_id):
80 80 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
81 81
82 82 def delete_checkpoint(self, name, path, checkpoint_id):
83 83 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
84 84
85 85 class APITest(NotebookTestBase):
86 86 """Test the kernels web service API"""
87 87 dirs_nbs = [('', 'inroot'),
88 88 ('Directory with spaces in', 'inspace'),
89 89 (u'unicodΓ©', 'innonascii'),
90 90 ('foo', 'a'),
91 91 ('foo', 'b'),
92 92 ('foo', 'name with spaces'),
93 93 ('foo', u'unicodΓ©'),
94 94 ('foo/bar', 'baz'),
95 95 (u'Γ₯ b', u'Γ§ d')
96 96 ]
97 97
98 98 dirs = uniq_stable([d for (d,n) in dirs_nbs])
99 99 del dirs[0] # remove ''
100 100
101 101 def setUp(self):
102 102 nbdir = self.notebook_dir.name
103 103
104 104 for d in self.dirs:
105 d.replace('/', os.path.sep)
105 d.replace('/', os.sep)
106 106 if not os.path.isdir(pjoin(nbdir, d)):
107 107 os.mkdir(pjoin(nbdir, d))
108 108
109 109 for d, name in self.dirs_nbs:
110 d = d.replace('/', os.path.sep)
110 d = d.replace('/', os.sep)
111 111 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w') as f:
112 112 nb = new_notebook(name=name)
113 113 write(nb, f, format='ipynb')
114 114
115 115 self.nb_api = NBAPI(self.base_url())
116 116
117 117 def tearDown(self):
118 118 nbdir = self.notebook_dir.name
119 119
120 120 for dname in ['foo', 'Directory with spaces in', u'unicodΓ©', u'Γ₯ b']:
121 121 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
122 122
123 123 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
124 124 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
125 125
126 126 def test_list_notebooks(self):
127 127 nbs = self.nb_api.list().json()
128 128 self.assertEqual(len(nbs), 1)
129 129 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
130 130
131 131 nbs = self.nb_api.list('/Directory with spaces in/').json()
132 132 self.assertEqual(len(nbs), 1)
133 133 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
134 134
135 135 nbs = self.nb_api.list(u'/unicodΓ©/').json()
136 136 self.assertEqual(len(nbs), 1)
137 137 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
138 138 self.assertEqual(nbs[0]['path'], u'unicodΓ©')
139 139
140 140 nbs = self.nb_api.list('/foo/bar/').json()
141 141 self.assertEqual(len(nbs), 1)
142 142 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
143 143 self.assertEqual(nbs[0]['path'], 'foo/bar')
144 144
145 145 nbs = self.nb_api.list('foo').json()
146 146 self.assertEqual(len(nbs), 4)
147 147 nbnames = { normalize('NFC', n['name']) for n in nbs }
148 148 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
149 149 expected = { normalize('NFC', name) for name in expected }
150 150 self.assertEqual(nbnames, expected)
151 151
152 152 def test_list_nonexistant_dir(self):
153 153 with assert_http_error(404):
154 154 self.nb_api.list('nonexistant')
155 155
156 156 def test_get_contents(self):
157 157 for d, name in self.dirs_nbs:
158 158 nb = self.nb_api.read('%s.ipynb' % name, d+'/').json()
159 159 self.assertEqual(nb['name'], u'%s.ipynb' % name)
160 160 self.assertIn('content', nb)
161 161 self.assertIn('metadata', nb['content'])
162 162 self.assertIsInstance(nb['content']['metadata'], dict)
163 163
164 164 # Name that doesn't exist - should be a 404
165 165 with assert_http_error(404):
166 166 self.nb_api.read('q.ipynb', 'foo')
167 167
168 168 def _check_nb_created(self, resp, name, path):
169 169 self.assertEqual(resp.status_code, 201)
170 170 location_header = py3compat.str_to_unicode(resp.headers['Location'])
171 171 self.assertEqual(location_header, url_escape(url_path_join(u'/api/notebooks', path, name)))
172 172 self.assertEqual(resp.json()['name'], name)
173 173 assert os.path.isfile(pjoin(
174 174 self.notebook_dir.name,
175 path.replace('/', os.path.sep),
175 path.replace('/', os.sep),
176 176 name,
177 177 ))
178 178
179 179 def test_create_untitled(self):
180 180 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
181 181 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
182 182
183 183 # Second time
184 184 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
185 185 self._check_nb_created(resp, 'Untitled1.ipynb', u'Γ₯ b')
186 186
187 187 # And two directories down
188 188 resp = self.nb_api.create_untitled(path='foo/bar')
189 189 self._check_nb_created(resp, 'Untitled0.ipynb', 'foo/bar')
190 190
191 191 def test_upload_untitled(self):
192 192 nb = new_notebook(name='Upload test')
193 193 nbmodel = {'content': nb}
194 194 resp = self.nb_api.upload_untitled(path=u'Γ₯ b',
195 195 body=json.dumps(nbmodel))
196 196 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
197 197
198 198 def test_upload(self):
199 199 nb = new_notebook(name=u'ignored')
200 200 nbmodel = {'content': nb}
201 201 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
202 202 body=json.dumps(nbmodel))
203 203 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
204 204
205 205 def test_upload_v2(self):
206 206 nb = v2.new_notebook()
207 207 ws = v2.new_worksheet()
208 208 nb.worksheets.append(ws)
209 209 ws.cells.append(v2.new_code_cell(input='print("hi")'))
210 210 nbmodel = {'content': nb}
211 211 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
212 212 body=json.dumps(nbmodel))
213 213 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
214 214 resp = self.nb_api.read(u'Upload tΓ©st.ipynb', u'Γ₯ b')
215 215 data = resp.json()
216 216 self.assertEqual(data['content']['nbformat'], current.nbformat)
217 217 self.assertEqual(data['content']['orig_nbformat'], 2)
218 218
219 219 def test_copy_untitled(self):
220 220 resp = self.nb_api.copy_untitled(u'Γ§ d.ipynb', path=u'Γ₯ b')
221 221 self._check_nb_created(resp, u'Γ§ d-Copy0.ipynb', u'Γ₯ b')
222 222
223 223 def test_copy(self):
224 224 resp = self.nb_api.copy(u'Γ§ d.ipynb', u'cΓΈpy.ipynb', path=u'Γ₯ b')
225 225 self._check_nb_created(resp, u'cΓΈpy.ipynb', u'Γ₯ b')
226 226
227 227 def test_delete(self):
228 228 for d, name in self.dirs_nbs:
229 229 resp = self.nb_api.delete('%s.ipynb' % name, d)
230 230 self.assertEqual(resp.status_code, 204)
231 231
232 232 for d in self.dirs + ['/']:
233 233 nbs = self.nb_api.list(d).json()
234 234 self.assertEqual(len(nbs), 0)
235 235
236 236 def test_rename(self):
237 237 resp = self.nb_api.rename('a.ipynb', 'foo', 'z.ipynb')
238 238 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
239 239 self.assertEqual(resp.json()['name'], 'z.ipynb')
240 240 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
241 241
242 242 nbs = self.nb_api.list('foo').json()
243 243 nbnames = set(n['name'] for n in nbs)
244 244 self.assertIn('z.ipynb', nbnames)
245 245 self.assertNotIn('a.ipynb', nbnames)
246 246
247 247 def test_save(self):
248 248 resp = self.nb_api.read('a.ipynb', 'foo')
249 249 nbcontent = json.loads(resp.text)['content']
250 250 nb = to_notebook_json(nbcontent)
251 251 ws = new_worksheet()
252 252 nb.worksheets = [ws]
253 253 ws.cells.append(new_heading_cell(u'Created by test Β³'))
254 254
255 255 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
256 256 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
257 257
258 258 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
259 259 with io.open(nbfile, 'r', encoding='utf-8') as f:
260 260 newnb = read(f, format='ipynb')
261 261 self.assertEqual(newnb.worksheets[0].cells[0].source,
262 262 u'Created by test Β³')
263 263 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
264 264 newnb = to_notebook_json(nbcontent)
265 265 self.assertEqual(newnb.worksheets[0].cells[0].source,
266 266 u'Created by test Β³')
267 267
268 268 # Save and rename
269 269 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb}
270 270 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
271 271 saved = resp.json()
272 272 self.assertEqual(saved['name'], 'a2.ipynb')
273 273 self.assertEqual(saved['path'], 'foo/bar')
274 274 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
275 275 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
276 276 with assert_http_error(404):
277 277 self.nb_api.read('a.ipynb', 'foo')
278 278
279 279 def test_checkpoints(self):
280 280 resp = self.nb_api.read('a.ipynb', 'foo')
281 281 r = self.nb_api.new_checkpoint('a.ipynb', 'foo')
282 282 self.assertEqual(r.status_code, 201)
283 283 cp1 = r.json()
284 284 self.assertEqual(set(cp1), {'id', 'last_modified'})
285 285 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
286 286
287 287 # Modify it
288 288 nbcontent = json.loads(resp.text)['content']
289 289 nb = to_notebook_json(nbcontent)
290 290 ws = new_worksheet()
291 291 nb.worksheets = [ws]
292 292 hcell = new_heading_cell('Created by test')
293 293 ws.cells.append(hcell)
294 294 # Save
295 295 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
296 296 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
297 297
298 298 # List checkpoints
299 299 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
300 300 self.assertEqual(cps, [cp1])
301 301
302 302 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
303 303 nb = to_notebook_json(nbcontent)
304 304 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
305 305
306 306 # Restore cp1
307 307 r = self.nb_api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
308 308 self.assertEqual(r.status_code, 204)
309 309 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
310 310 nb = to_notebook_json(nbcontent)
311 311 self.assertEqual(nb.worksheets, [])
312 312
313 313 # Delete cp1
314 314 r = self.nb_api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
315 315 self.assertEqual(r.status_code, 204)
316 316 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
317 317 self.assertEqual(cps, [])
318 318
@@ -1,51 +1,51 b''
1 1 # coding: utf-8
2 2 """Test the /files/ handler."""
3 3
4 4 import io
5 5 import os
6 6 from unicodedata import normalize
7 7
8 8 pjoin = os.path.join
9 9
10 10 import requests
11 11
12 12 from IPython.html.utils import url_path_join
13 13 from .launchnotebook import NotebookTestBase
14 14 from IPython.utils import py3compat
15 15
16 16 class FilesTest(NotebookTestBase):
17 17 def test_hidden_files(self):
18 18 not_hidden = [
19 19 u'Γ₯ b',
20 20 pjoin(u'Γ₯ b/Γ§. d')
21 21 ]
22 22 hidden = [
23 23 u'.Γ₯ b',
24 24 pjoin(u'Γ₯ b/.Γ§ d')
25 25 ]
26 26 dirs = not_hidden + hidden
27 27
28 28 nbdir = self.notebook_dir.name
29 29 for d in dirs:
30 path = pjoin(nbdir, d.replace('/', os.path.sep))
30 path = pjoin(nbdir, d.replace('/', os.sep))
31 31 if not os.path.exists(path):
32 32 os.mkdir(path)
33 33 with open(pjoin(path, 'foo'), 'w') as f:
34 34 f.write('foo')
35 35 with open(pjoin(path, '.foo'), 'w') as f:
36 36 f.write('.foo')
37 37 url = self.base_url()
38 38
39 39 for d in not_hidden:
40 path = pjoin(nbdir, d.replace('/', os.path.sep))
40 path = pjoin(nbdir, d.replace('/', os.sep))
41 41 r = requests.get(url_path_join(url, 'files', d, 'foo'))
42 42 r.raise_for_status()
43 43 self.assertEqual(r.content, b'foo')
44 44 r = requests.get(url_path_join(url, 'files', d, '.foo'))
45 45 self.assertEqual(r.status_code, 403)
46 46
47 47 for d in hidden:
48 path = pjoin(nbdir, d.replace('/', os.path.sep))
48 path = pjoin(nbdir, d.replace('/', os.sep))
49 49 for foo in ('foo', '.foo'):
50 50 r = requests.get(url_path_join(url, 'files', d, foo))
51 51 self.assertEqual(r.status_code, 403)
@@ -1,71 +1,71 b''
1 1 """Notebook related utilities
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 import os
16 16 from urllib import quote, unquote
17 17
18 18 from IPython.utils import py3compat
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 def url_path_join(*pieces):
25 25 """Join components of url into a relative url
26 26
27 27 Use to prevent double slash when joining subpath. This will leave the
28 28 initial and final / in place
29 29 """
30 30 initial = pieces[0].startswith('/')
31 31 final = pieces[-1].endswith('/')
32 32 stripped = [s.strip('/') for s in pieces]
33 33 result = '/'.join(s for s in stripped if s)
34 34 if initial: result = '/' + result
35 35 if final: result = result + '/'
36 36 if result == '//': result = '/'
37 37 return result
38 38
39 39 def path2url(path):
40 40 """Convert a local file path to a URL"""
41 pieces = [ quote(p) for p in path.split(os.path.sep) ]
41 pieces = [ quote(p) for p in path.split(os.sep) ]
42 42 # preserve trailing /
43 43 if pieces[-1] == '':
44 44 pieces[-1] = '/'
45 45 url = url_path_join(*pieces)
46 46 return url
47 47
48 48 def url2path(url):
49 49 """Convert a URL to a local file path"""
50 50 pieces = [ unquote(p) for p in url.split('/') ]
51 51 path = os.path.join(*pieces)
52 52 return path
53 53
54 54 def url_escape(path):
55 55 """Escape special characters in a URL path
56 56
57 57 Turns '/foo bar/' into '/foo%20bar/'
58 58 """
59 59 parts = py3compat.unicode_to_str(path).split('/')
60 60 return u'/'.join([quote(p) for p in parts])
61 61
62 62 def url_unescape(path):
63 63 """Unescape special characters in a URL path
64 64
65 65 Turns '/foo%20bar/' into '/foo bar/'
66 66 """
67 67 return u'/'.join([
68 68 py3compat.str_to_unicode(unquote(p))
69 69 for p in py3compat.unicode_to_str(path).split('/')
70 70 ])
71 71
General Comments 0
You need to be logged in to leave comments. Login now