tweens.py
125 lines
| 4.2 KiB
| text/x-python
|
PythonLexer
/ rhodecode / tweens.py
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import logging | ||||
r5014 | ||||
import pyramid.tweens | ||||
r3145 | from pyramid.httpexceptions import HTTPException, HTTPBadRequest | |||
r1 | ||||
r1297 | from rhodecode.lib.middleware.vcs import ( | |||
detect_vcs_request, VCS_TYPE_KEY, VCS_TYPE_SKIP) | ||||
r1 | ||||
r1297 | ||||
r1 | log = logging.getLogger(__name__) | |||
r1309 | def vcs_detection_tween_factory(handler, registry): | |||
r1297 | ||||
r1309 | def vcs_detection_tween(request): | |||
r1 | """ | |||
r1309 | Do detection of vcs type, and save results for other layers to re-use | |||
this information | ||||
r1 | """ | |||
r2351 | vcs_server_enabled = request.registry.settings.get('vcs.server.enable') | |||
r5014 | ||||
r2351 | vcs_handler = vcs_server_enabled and detect_vcs_request( | |||
r1297 | request.environ, request.registry.settings.get('vcs.backends')) | |||
if vcs_handler: | ||||
r1300 | # save detected VCS type for later re-use | |||
r1297 | request.environ[VCS_TYPE_KEY] = vcs_handler.SCM | |||
r1309 | request.vcs_call = vcs_handler.SCM | |||
r2351 | ||||
r4944 | log.debug('Processing request with `%s` handler', handler.__name__) | |||
r1297 | return handler(request) | |||
r1300 | # mark that we didn't detect an VCS, and we can skip detection later on | |||
r1297 | request.environ[VCS_TYPE_KEY] = VCS_TYPE_SKIP | |||
r1 | ||||
r4944 | log.debug('Processing request with `%s` handler', handler.__name__) | |||
r669 | return handler(request) | |||
r1 | ||||
r1309 | return vcs_detection_tween | |||
r1 | ||||
r3145 | def junk_encoding_detector(request): | |||
""" | ||||
Detect bad encoded GET params, and fail immediately with BadRequest | ||||
""" | ||||
try: | ||||
request.GET.get("", None) | ||||
except UnicodeDecodeError: | ||||
raise HTTPBadRequest("Invalid bytes in query string.") | ||||
def bad_url_data_detector(request): | ||||
""" | ||||
Detect invalid bytes in a path. | ||||
""" | ||||
try: | ||||
request.path_info | ||||
except UnicodeDecodeError: | ||||
raise HTTPBadRequest("Invalid bytes in URL.") | ||||
def junk_form_data_detector(request): | ||||
""" | ||||
Detect bad encoded POST params, and fail immediately with BadRequest | ||||
""" | ||||
if request.method == "POST": | ||||
try: | ||||
request.POST.get("", None) | ||||
except ValueError: | ||||
raise HTTPBadRequest("Invalid bytes in form data.") | ||||
def sanity_check_factory(handler, registry): | ||||
def sanity_check(request): | ||||
r3776 | log.debug('Checking current URL sanity for bad data') | |||
r3145 | try: | |||
junk_encoding_detector(request) | ||||
bad_url_data_detector(request) | ||||
junk_form_data_detector(request) | ||||
except HTTPException as exc: | ||||
return exc | ||||
return handler(request) | ||||
return sanity_check | ||||
r1 | def includeme(config): | |||
config.add_subscriber('rhodecode.subscribers.add_renderer_globals', | ||||
'pyramid.events.BeforeRender') | ||||
r4878 | config.add_subscriber('rhodecode.subscribers.update_celery_conf', | |||
'pyramid.events.NewRequest') | ||||
r1307 | config.add_subscriber('rhodecode.subscribers.set_user_lang', | |||
'pyramid.events.NewRequest') | ||||
r4768 | config.add_subscriber('rhodecode.subscribers.reset_log_bucket', | |||
'pyramid.events.NewRequest') | ||||
r1903 | config.add_subscriber('rhodecode.subscribers.add_request_user_context', | |||
'pyramid.events.ContextFound') | ||||
r3537 | config.add_tween('rhodecode.tweens.vcs_detection_tween_factory') | |||
r3145 | config.add_tween('rhodecode.tweens.sanity_check_factory') | |||
r4157 | ||||
# This needs to be the LAST item | ||||
r5014 | config.add_tween('rhodecode.lib.middleware.request_wrapper.RequestWrapperTween', under=pyramid.tweens.INGRESS) | |||
r4608 | log.debug('configured all tweens') | |||