echo_app.py
56 lines
| 1.5 KiB
| text/x-python
|
PythonLexer
r1126 | # Copyright (C) 2014-2023 RhodeCode GmbH | |||
r0 | """ | |||
Implementation of :class:`EchoApp`. | ||||
This WSGI application will just echo back the data which it recieves. | ||||
""" | ||||
import logging | ||||
log = logging.getLogger(__name__) | ||||
class EchoApp(object): | ||||
def __init__(self, repo_path, repo_name, config): | ||||
self._repo_path = repo_path | ||||
log.info("EchoApp initialized for %s", repo_path) | ||||
def __call__(self, environ, start_response): | ||||
log.debug("EchoApp called for %s", self._repo_path) | ||||
log.debug("Content-Length: %s", environ.get('CONTENT_LENGTH')) | ||||
environ['wsgi.input'].read() | ||||
status = '200 OK' | ||||
r246 | headers = [('Content-Type', 'text/plain')] | |||
r0 | start_response(status, headers) | |||
r1048 | return [b"ECHO"] | |||
r0 | ||||
r248 | class EchoAppStream(object): | |||
def __init__(self, repo_path, repo_name, config): | ||||
self._repo_path = repo_path | ||||
log.info("EchoApp initialized for %s", repo_path) | ||||
def __call__(self, environ, start_response): | ||||
log.debug("EchoApp called for %s", self._repo_path) | ||||
log.debug("Content-Length: %s", environ.get('CONTENT_LENGTH')) | ||||
environ['wsgi.input'].read() | ||||
status = '200 OK' | ||||
headers = [('Content-Type', 'text/plain')] | ||||
start_response(status, headers) | ||||
def generator(): | ||||
r982 | for _ in range(1000000): | |||
r1048 | yield b"ECHO_STREAM" | |||
r248 | return generator() | |||
r0 | def create_app(): | |||
""" | ||||
Allows to run this app directly in a WSGI server. | ||||
""" | ||||
stub_config = {} | ||||
return EchoApp('stub_path', 'stub_name', stub_config) | ||||