##// END OF EJS Templates
tests: add code to handle HTTP digests on the server side...
Matt Harbison -
r41727:ccaa5286 default
parent child Browse files
Show More
@@ -1,8 +1,89 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import base64
3 import base64
4 import hashlib
4
5
5 from mercurial.hgweb import common
6 from mercurial.hgweb import common
7 from mercurial import (
8 node,
9 )
10
11 def parse_keqv_list(req, l):
12 """Parse list of key=value strings where keys are not duplicated."""
13 parsed = {}
14 for elt in l:
15 k, v = elt.split(b'=', 1)
16 if v[0:1] == b'"' and v[-1:] == b'"':
17 v = v[1:-1]
18 parsed[k] = v
19 return parsed
20
21 class digestauthserver(object):
22 def __init__(self):
23 self._user_hashes = {}
24
25 def gethashers(self):
26 def _md5sum(x):
27 m = hashlib.md5()
28 m.update(x)
29 return node.hex(m.digest())
30
31 h = _md5sum
32
33 kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
34 return h, kd
35
36 def adduser(self, user, password, realm):
37 h, kd = self.gethashers()
38 a1 = h(b'%s:%s:%s' % (user, realm, password))
39 self._user_hashes[(user, realm)] = a1
40
41 def makechallenge(self, realm):
42 # We aren't testing the protocol here, just that the bytes make the
43 # proper round trip. So hardcoded seems fine.
44 nonce = b'064af982c5b571cea6450d8eda91c20d'
45 return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm,
46 nonce)
47
48 def checkauth(self, req, header):
49 log = req.rawenv[b'wsgi.errors']
50
51 h, kd = self.gethashers()
52 resp = parse_keqv_list(req, header.split(b', '))
53
54 if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
55 log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
56 raise common.ErrorResponse(common.HTTP_FORBIDDEN,
57 b"unknown algorithm")
58 user = resp[b'username']
59 realm = resp[b'realm']
60 nonce = resp[b'nonce']
61
62 ha1 = self._user_hashes.get((user, realm))
63 if not ha1:
64 log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
65 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")
66
67 qop = resp.get(b'qop', b'auth')
68 if qop != b'auth':
69 log.write(b"Unsupported qop: %s" % qop)
70 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")
71
72 cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
73 if not cnonce or not ncvalue:
74 log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
75 raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")
76
77 a2 = b'%s:%s' % (req.method, resp[b'uri'])
78 noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))
79
80 respdig = kd(ha1, noncebit)
81 if respdig != resp[b'response']:
82 log.write(b'User/realm "%s/%s" gave %s, but expected %s'
83 % (user, realm, resp[b'response'], respdig))
84 return False
85
86 return True
6
87
7 def perform_authentication(hgweb, req, op):
88 def perform_authentication(hgweb, req, op):
8 auth = req.headers.get(b'Authorization')
89 auth = req.headers.get(b'Authorization')
General Comments 0
You need to be logged in to leave comments. Login now