diff --git a/tests/httpserverauth.py b/tests/httpserverauth.py --- a/tests/httpserverauth.py +++ b/tests/httpserverauth.py @@ -1,8 +1,89 @@ from __future__ import absolute_import import base64 +import hashlib from mercurial.hgweb import common +from mercurial import ( + node, +) + +def parse_keqv_list(req, l): + """Parse list of key=value strings where keys are not duplicated.""" + parsed = {} + for elt in l: + k, v = elt.split(b'=', 1) + if v[0:1] == b'"' and v[-1:] == b'"': + v = v[1:-1] + parsed[k] = v + return parsed + +class digestauthserver(object): + def __init__(self): + self._user_hashes = {} + + def gethashers(self): + def _md5sum(x): + m = hashlib.md5() + m.update(x) + return node.hex(m.digest()) + + h = _md5sum + + kd = lambda s, d, h=h: h(b"%s:%s" % (s, d)) + return h, kd + + def adduser(self, user, password, realm): + h, kd = self.gethashers() + a1 = h(b'%s:%s:%s' % (user, realm, password)) + self._user_hashes[(user, realm)] = a1 + + def makechallenge(self, realm): + # We aren't testing the protocol here, just that the bytes make the + # proper round trip. So hardcoded seems fine. + nonce = b'064af982c5b571cea6450d8eda91c20d' + return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm, + nonce) + + def checkauth(self, req, header): + log = req.rawenv[b'wsgi.errors'] + + h, kd = self.gethashers() + resp = parse_keqv_list(req, header.split(b', ')) + + if resp.get(b'algorithm', b'MD5').upper() != b'MD5': + log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm')) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, + b"unknown algorithm") + user = resp[b'username'] + realm = resp[b'realm'] + nonce = resp[b'nonce'] + + ha1 = self._user_hashes.get((user, realm)) + if not ha1: + log.write(b'No hash found for user/realm "%s/%s"' % (user, realm)) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user") + + qop = resp.get(b'qop', b'auth') + if qop != b'auth': + log.write(b"Unsupported qop: %s" % qop) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop") + + cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc') + if not cnonce or not ncvalue: + log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue)) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce") + + a2 = b'%s:%s' % (req.method, resp[b'uri']) + noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2)) + + respdig = kd(ha1, noncebit) + if respdig != resp[b'response']: + log.write(b'User/realm "%s/%s" gave %s, but expected %s' + % (user, realm, resp[b'response'], respdig)) + return False + + return True def perform_authentication(hgweb, req, op): auth = req.headers.get(b'Authorization')