test_wsgi_app_caller.py
98 lines
| 3.0 KiB
| text/x-python
|
PythonLexer
r130 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r1327 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
r130 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
import wsgiref.simple_server | ||||
import wsgiref.validate | ||||
from vcsserver import wsgi_app_caller | ||||
r1249 | from vcsserver.lib.str_utils import ascii_bytes, safe_str | |||
r130 | ||||
@wsgiref.validate.validator | ||||
def demo_app(environ, start_response): | ||||
"""WSGI app used for testing.""" | ||||
r1048 | ||||
input_data = safe_str(environ['wsgi.input'].read(1024)) | ||||
r130 | data = [ | |||
r1063 | 'Hello World!\n', | |||
r1048 | f'input_data={input_data}\n', | |||
r130 | ] | |||
for key, value in sorted(environ.items()): | ||||
r1048 | data.append(f'{key}={value}\n') | |||
r130 | ||||
write = start_response("200 OK", [('Content-Type', 'text/plain')]) | ||||
r1048 | write(b'Old school write method\n') | |||
write(b'***********************\n') | ||||
return list(map(ascii_bytes, data)) | ||||
r130 | ||||
BASE_ENVIRON = { | ||||
'REQUEST_METHOD': 'GET', | ||||
'SERVER_NAME': 'localhost', | ||||
'SERVER_PORT': '80', | ||||
'SCRIPT_NAME': '', | ||||
'PATH_INFO': '/', | ||||
'QUERY_STRING': '', | ||||
'foo.var': 'bla', | ||||
} | ||||
def test_complete_environ(): | ||||
environ = dict(BASE_ENVIRON) | ||||
r1048 | data = b"data" | |||
r130 | wsgi_app_caller._complete_environ(environ, data) | |||
wsgiref.validate.check_environ(environ) | ||||
r1048 | assert data == environ['wsgi.input'].read(1024) | |||
r130 | ||||
def test_start_response(): | ||||
start_response = wsgi_app_caller._StartResponse() | ||||
status = '200 OK' | ||||
headers = [('Content-Type', 'text/plain')] | ||||
start_response(status, headers) | ||||
assert status == start_response.status | ||||
assert headers == start_response.headers | ||||
def test_start_response_with_error(): | ||||
start_response = wsgi_app_caller._StartResponse() | ||||
status = '500 Internal Server Error' | ||||
headers = [('Content-Type', 'text/plain')] | ||||
start_response(status, headers, (None, None, None)) | ||||
assert status == start_response.status | ||||
assert headers == start_response.headers | ||||
def test_wsgi_app_caller(): | ||||
environ = dict(BASE_ENVIRON) | ||||
input_data = 'some text' | ||||
r1048 | ||||
caller = wsgi_app_caller.WSGIAppCaller(demo_app) | ||||
r130 | responses, status, headers = caller.handle(environ, input_data) | |||
r1048 | response = b''.join(responses) | |||
r130 | ||||
assert status == '200 OK' | ||||
assert headers == [('Content-Type', 'text/plain')] | ||||
r1048 | assert response.startswith(b'Old school write method\n***********************\n') | |||
assert b'Hello World!\n' in response | ||||
assert b'foo.var=bla\n' in response | ||||
assert ascii_bytes(f'input_data={input_data}\n') in response | ||||