wsgiheaders.py
185 lines
| 6.9 KiB
| text/x-python
|
PythonLexer
Augie Fackler
|
r37623 | """This was forked from cpython's wsgiref.headers module to work on bytes. | ||
Header from old file showing copyright is below. | ||||
Much of this module is red-handedly pilfered from email.message in the stdlib, | ||||
so portions are Copyright (C) 2001,2002 Python Software Foundation, and were | ||||
written by Barry Warsaw. | ||||
""" | ||||
# Regular expression that matches `special' characters in parameters, the | ||||
# existence of which force quoting of the parameter value. | ||||
from __future__ import absolute_import, print_function | ||||
import re | ||||
Augie Fackler
|
r43346 | |||
Matt Harbison
|
r44473 | tspecials = re.compile(br'[ ()<>@,;:\\"/\[\]?=]') | ||
Augie Fackler
|
r37623 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r37623 | def _formatparam(param, value=None, quote=1): | ||
"""Convenience function to format and return a key=value pair. | ||||
This will quote the value if needed or if quote is true. | ||||
""" | ||||
if value is not None and len(value) > 0: | ||||
if quote or tspecials.search(value): | ||||
Augie Fackler
|
r43347 | value = value.replace(b'\\', b'\\\\').replace(b'"', r'\"') | ||
return b'%s="%s"' % (param, value) | ||||
Augie Fackler
|
r37623 | else: | ||
Augie Fackler
|
r43347 | return b'%s=%s' % (param, value) | ||
Augie Fackler
|
r37623 | else: | ||
return param | ||||
class Headers(object): | ||||
"""Manage a collection of HTTP response headers""" | ||||
def __init__(self, headers=None): | ||||
headers = headers if headers is not None else [] | ||||
if type(headers) is not list: | ||||
Augie Fackler
|
r43347 | raise TypeError(b"Headers must be a list of name/value tuples") | ||
Augie Fackler
|
r37623 | self._headers = headers | ||
if __debug__: | ||||
for k, v in headers: | ||||
self._convert_string_type(k) | ||||
self._convert_string_type(v) | ||||
def _convert_string_type(self, value): | ||||
"""Convert/check value type.""" | ||||
if type(value) is bytes: | ||||
return value | ||||
Augie Fackler
|
r43346 | raise AssertionError( | ||
u"Header names/values must be" | ||||
u" of type bytes (got %s)" % repr(value) | ||||
) | ||||
Augie Fackler
|
r37623 | |||
def __len__(self): | ||||
"""Return the total number of headers, including duplicates.""" | ||||
return len(self._headers) | ||||
def __setitem__(self, name, val): | ||||
"""Set the value of a header.""" | ||||
del self[name] | ||||
self._headers.append( | ||||
Augie Fackler
|
r43346 | (self._convert_string_type(name), self._convert_string_type(val)) | ||
) | ||||
Augie Fackler
|
r37623 | |||
def __delitem__(self, name): | ||||
"""Delete all occurrences of a header, if present. | ||||
Does *not* raise an exception if the header is missing. | ||||
""" | ||||
name = self._convert_string_type(name.lower()) | ||||
self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name] | ||||
def __getitem__(self, name): | ||||
"""Get the first header value for 'name' | ||||
Return None if the header is missing instead of raising an exception. | ||||
Note that if the header appeared multiple times, the first exactly which | ||||
occurrence gets returned is undefined. Use getall() to get all | ||||
the values matching a header field name. | ||||
""" | ||||
return self.get(name) | ||||
def __contains__(self, name): | ||||
"""Return true if the message contains the header.""" | ||||
return self.get(name) is not None | ||||
def get_all(self, name): | ||||
"""Return a list of all the values for the named field. | ||||
These will be sorted in the order they appeared in the original header | ||||
list or were added to this instance, and may contain duplicates. Any | ||||
fields deleted and re-inserted are always appended to the header list. | ||||
If no fields exist with the given name, returns an empty list. | ||||
""" | ||||
name = self._convert_string_type(name.lower()) | ||||
Augie Fackler
|
r43346 | return [kv[1] for kv in self._headers if kv[0].lower() == name] | ||
Augie Fackler
|
r37623 | |||
def get(self, name, default=None): | ||||
"""Get the first header value for 'name', or return 'default'""" | ||||
name = self._convert_string_type(name.lower()) | ||||
for k, v in self._headers: | ||||
Augie Fackler
|
r43346 | if k.lower() == name: | ||
Augie Fackler
|
r37623 | return v | ||
return default | ||||
def keys(self): | ||||
"""Return a list of all the header field names. | ||||
These will be sorted in the order they appeared in the original header | ||||
list, or were added to this instance, and may contain duplicates. | ||||
Any fields deleted and re-inserted are always appended to the header | ||||
list. | ||||
""" | ||||
return [k for k, v in self._headers] | ||||
def values(self): | ||||
"""Return a list of all header values. | ||||
These will be sorted in the order they appeared in the original header | ||||
list, or were added to this instance, and may contain duplicates. | ||||
Any fields deleted and re-inserted are always appended to the header | ||||
list. | ||||
""" | ||||
return [v for k, v in self._headers] | ||||
def items(self): | ||||
"""Get all the header fields and values. | ||||
These will be sorted in the order they were in the original header | ||||
list, or were added to this instance, and may contain duplicates. | ||||
Any fields deleted and re-inserted are always appended to the header | ||||
list. | ||||
""" | ||||
return self._headers[:] | ||||
def __repr__(self): | ||||
Augie Fackler
|
r43809 | return "%s(%r)" % (self.__class__.__name__, self._headers) | ||
Augie Fackler
|
r37623 | |||
def __str__(self): | ||||
"""str() returns the formatted headers, complete with end line, | ||||
suitable for direct HTTP transmission.""" | ||||
Augie Fackler
|
r43347 | return b'\r\n'.join( | ||
[b"%s: %s" % kv for kv in self._headers] + [b'', b''] | ||||
) | ||||
Augie Fackler
|
r37623 | |||
def __bytes__(self): | ||||
return str(self).encode('iso-8859-1') | ||||
def setdefault(self, name, value): | ||||
"""Return first matching header value for 'name', or 'value' | ||||
If there is no header named 'name', add a new header with name 'name' | ||||
and value 'value'.""" | ||||
result = self.get(name) | ||||
if result is None: | ||||
Augie Fackler
|
r43346 | self._headers.append( | ||
( | ||||
self._convert_string_type(name), | ||||
self._convert_string_type(value), | ||||
) | ||||
) | ||||
Augie Fackler
|
r37623 | return value | ||
else: | ||||
return result | ||||
def add_header(self, _name, _value, **_params): | ||||
"""Extended header setting. | ||||
_name is the header field to add. keyword arguments can be used to set | ||||
additional parameters for the header field, with underscores converted | ||||
to dashes. Normally the parameter will be added as key="value" unless | ||||
value is None, in which case only the key will be added. | ||||
Example: | ||||
h.add_header('content-disposition', 'attachment', filename='bud.gif') | ||||
Note that unlike the corresponding 'email.message' method, this does | ||||
*not* handle '(charset, language, value)' tuples: all values must be | ||||
strings or None. | ||||
""" | ||||
parts = [] | ||||
if _value is not None: | ||||
_value = self._convert_string_type(_value) | ||||
parts.append(_value) | ||||
for k, v in _params.items(): | ||||
k = self._convert_string_type(k) | ||||
if v is None: | ||||
Augie Fackler
|
r43347 | parts.append(k.replace(b'_', b'-')) | ||
Augie Fackler
|
r37623 | else: | ||
v = self._convert_string_type(v) | ||||
Augie Fackler
|
r43347 | parts.append(_formatparam(k.replace(b'_', b'-'), v)) | ||
Augie Fackler
|
r37623 | self._headers.append( | ||
Augie Fackler
|
r43347 | (self._convert_string_type(_name), b"; ".join(parts)) | ||
Augie Fackler
|
r43346 | ) | ||