|
|
# Copyright (C) 2010-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import logging
|
|
|
import textwrap
|
|
|
|
|
|
import routes.middleware
|
|
|
import urlobject
|
|
|
import webob
|
|
|
import webob.exc
|
|
|
|
|
|
import rhodecode.lib.auth
|
|
|
from rhodecode.lib.middleware.utils import get_path_info
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class CSRFDetector(object):
|
|
|
"""
|
|
|
Middleware for preventing CSRF.
|
|
|
|
|
|
|
|
|
It checks that all requests are either GET or POST.
|
|
|
For POST requests, it logs the requests that do not have a CSRF token.
|
|
|
Eventually it will raise an error.
|
|
|
|
|
|
It special cases some endpoints as they do not really require a token.
|
|
|
|
|
|
Note: this middleware is only intended for testing.
|
|
|
"""
|
|
|
|
|
|
_PUT_DELETE_MESSAGE = textwrap.dedent('''
|
|
|
Do not call in tests app.delete or app.put, use instead
|
|
|
app.post(..., params={'_method': 'delete'}.
|
|
|
|
|
|
The reason is twofold. The first is because that's how the browser is
|
|
|
calling rhodecode and the second is because it allow us to detect
|
|
|
potential CSRF.''').strip()
|
|
|
|
|
|
_PATHS_WITHOUT_TOKEN = frozenset((
|
|
|
# The password is the token.
|
|
|
'/_admin/login',
|
|
|
# Captcha may be enabled.
|
|
|
'/_admin/password_reset',
|
|
|
# Captcha may be enabled.
|
|
|
'/_admin/password_reset_confirmation',
|
|
|
# Captcha may be enabled.
|
|
|
'/_admin/register',
|
|
|
# No change in state with this controller.
|
|
|
'/error/document',
|
|
|
))
|
|
|
|
|
|
_SKIP_PATTERN = frozenset((
|
|
|
'/_admin/gists/',
|
|
|
))
|
|
|
|
|
|
def __init__(self, app):
|
|
|
self._app = app
|
|
|
|
|
|
def __call__(self, environ, start_response):
|
|
|
if environ['REQUEST_METHOD'].upper() not in ('GET', 'POST'):
|
|
|
raise Exception(self._PUT_DELETE_MESSAGE)
|
|
|
path_info = get_path_info(environ)
|
|
|
token_expected = path_info not in self._PATHS_WITHOUT_TOKEN
|
|
|
allowed = True
|
|
|
for pattern in self._SKIP_PATTERN:
|
|
|
if path_info.startswith(pattern):
|
|
|
allowed = False
|
|
|
break
|
|
|
|
|
|
if (environ['REQUEST_METHOD'] == 'POST' and
|
|
|
token_expected and allowed and
|
|
|
routes.middleware.is_form_post(environ)):
|
|
|
body = environ['wsgi.input']
|
|
|
if body.seekable():
|
|
|
pos = body.tell()
|
|
|
content = body.read()
|
|
|
body.seek(pos)
|
|
|
elif hasattr(body, 'peek'):
|
|
|
content = body.peek()
|
|
|
else:
|
|
|
raise Exception("Cannot check if the request has a CSRF token")
|
|
|
if rhodecode.lib.auth.csrf_token_key not in content:
|
|
|
raise Exception(
|
|
|
'%s to %s does not have a csrf_token %r' %
|
|
|
(environ['REQUEST_METHOD'], path_info, content))
|
|
|
|
|
|
return self._app(environ, start_response)
|
|
|
|
|
|
|
|
|
def _get_scheme_host_port(url):
|
|
|
url = urlobject.URLObject(url)
|
|
|
if '://' not in url:
|
|
|
return None, url, None
|
|
|
|
|
|
scheme = url.scheme or 'http'
|
|
|
port = url.port
|
|
|
if not port:
|
|
|
if scheme == 'http':
|
|
|
port = 80
|
|
|
elif scheme == 'https':
|
|
|
port = 443
|
|
|
host = url.netloc.without_port()
|
|
|
|
|
|
return scheme, host, port
|
|
|
|
|
|
|
|
|
def _equivalent_urls(url1, url2):
|
|
|
"""Check if both urls are equivalent."""
|
|
|
return _get_scheme_host_port(url1) == _get_scheme_host_port(url2)
|
|
|
|
|
|
|
|
|
class OriginChecker(object):
|
|
|
"""
|
|
|
Check whether the request has a valid Origin header.
|
|
|
|
|
|
See https://wiki.mozilla.org/Security/Origin for details.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, app, expected_origin, skip_urls=None):
|
|
|
"""
|
|
|
:param expected_origin: the value we expect to see for the Origin
|
|
|
header.
|
|
|
:param skip_urls: list of urls for which we do not need to check the
|
|
|
Origin header.
|
|
|
"""
|
|
|
self._app = app
|
|
|
self._expected_origin = expected_origin
|
|
|
self._skip_urls = frozenset(skip_urls or [])
|
|
|
|
|
|
def __call__(self, environ, start_response):
|
|
|
origin_header = environ.get('HTTP_ORIGIN', '')
|
|
|
origin = origin_header.split(' ', 1)[0]
|
|
|
if origin == 'null':
|
|
|
origin = None
|
|
|
path_info = get_path_info(environ)
|
|
|
if (path_info not in self._skip_urls and origin and
|
|
|
not _equivalent_urls(origin, self._expected_origin)):
|
|
|
log.warning(
|
|
|
'Invalid Origin header detected: got %s, expected %s',
|
|
|
origin_header, self._expected_origin)
|
|
|
return webob.exc.HTTPForbidden('Origin header mismatch')(
|
|
|
environ, start_response)
|
|
|
else:
|
|
|
return self._app(environ, start_response)
|
|
|
|