httpserverauth.py
113 lines
| 3.8 KiB
| text/x-python
|
PythonLexer
/ tests / httpserverauth.py
Matt Harbison
|
r41725 | from __future__ import absolute_import | ||
import base64 | ||||
Matt Harbison
|
r41727 | import hashlib | ||
Matt Harbison
|
r41725 | |||
from mercurial.hgweb import common | ||||
Matt Harbison
|
r41727 | from mercurial import ( | ||
node, | ||||
) | ||||
def parse_keqv_list(req, l): | ||||
"""Parse list of key=value strings where keys are not duplicated.""" | ||||
parsed = {} | ||||
for elt in l: | ||||
k, v = elt.split(b'=', 1) | ||||
if v[0:1] == b'"' and v[-1:] == b'"': | ||||
v = v[1:-1] | ||||
parsed[k] = v | ||||
return parsed | ||||
class digestauthserver(object): | ||||
def __init__(self): | ||||
self._user_hashes = {} | ||||
def gethashers(self): | ||||
def _md5sum(x): | ||||
m = hashlib.md5() | ||||
m.update(x) | ||||
return node.hex(m.digest()) | ||||
h = _md5sum | ||||
kd = lambda s, d, h=h: h(b"%s:%s" % (s, d)) | ||||
return h, kd | ||||
def adduser(self, user, password, realm): | ||||
h, kd = self.gethashers() | ||||
a1 = h(b'%s:%s:%s' % (user, realm, password)) | ||||
self._user_hashes[(user, realm)] = a1 | ||||
def makechallenge(self, realm): | ||||
# We aren't testing the protocol here, just that the bytes make the | ||||
# proper round trip. So hardcoded seems fine. | ||||
nonce = b'064af982c5b571cea6450d8eda91c20d' | ||||
return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm, | ||||
nonce) | ||||
def checkauth(self, req, header): | ||||
log = req.rawenv[b'wsgi.errors'] | ||||
h, kd = self.gethashers() | ||||
resp = parse_keqv_list(req, header.split(b', ')) | ||||
if resp.get(b'algorithm', b'MD5').upper() != b'MD5': | ||||
log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm')) | ||||
raise common.ErrorResponse(common.HTTP_FORBIDDEN, | ||||
b"unknown algorithm") | ||||
user = resp[b'username'] | ||||
realm = resp[b'realm'] | ||||
nonce = resp[b'nonce'] | ||||
ha1 = self._user_hashes.get((user, realm)) | ||||
if not ha1: | ||||
log.write(b'No hash found for user/realm "%s/%s"' % (user, realm)) | ||||
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user") | ||||
qop = resp.get(b'qop', b'auth') | ||||
if qop != b'auth': | ||||
log.write(b"Unsupported qop: %s" % qop) | ||||
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop") | ||||
cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc') | ||||
if not cnonce or not ncvalue: | ||||
log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue)) | ||||
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce") | ||||
a2 = b'%s:%s' % (req.method, resp[b'uri']) | ||||
noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2)) | ||||
respdig = kd(ha1, noncebit) | ||||
if respdig != resp[b'response']: | ||||
log.write(b'User/realm "%s/%s" gave %s, but expected %s' | ||||
% (user, realm, resp[b'response'], respdig)) | ||||
return False | ||||
return True | ||||
Matt Harbison
|
r41725 | |||
Matt Harbison
|
r41729 | digest = digestauthserver() | ||
Matt Harbison
|
r41725 | def perform_authentication(hgweb, req, op): | ||
auth = req.headers.get(b'Authorization') | ||||
Matt Harbison
|
r41729 | |||
if req.headers.get(b'X-HgTest-AuthType') == b'Digest': | ||||
if not auth: | ||||
challenge = digest.makechallenge(b'mercurial') | ||||
raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who', | ||||
[(b'WWW-Authenticate', b'Digest %s' % challenge)]) | ||||
if not digest.checkauth(req, auth[7:]): | ||||
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no') | ||||
return | ||||
Matt Harbison
|
r41725 | if not auth: | ||
raise common.ErrorResponse(common.HTTP_UNAUTHORIZED, b'who', | ||||
[(b'WWW-Authenticate', b'Basic Realm="mercurial"')]) | ||||
if base64.b64decode(auth.split()[1]).split(b':', 1) != [b'user', b'pass']: | ||||
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no') | ||||
def extsetup(ui): | ||||
common.permhooks.insert(0, perform_authentication) | ||||
Matt Harbison
|
r41729 | digest.adduser(b'user', b'pass', b'mercurial') | ||