##// END OF EJS Templates
sslutil: abort properly if no certificate received for https connection...
Mads Kiilerich -
r15817:8f377751 default
parent child Browse files
Show More
@@ -1,138 +1,141
1 1 # sslutil.py - SSL handling for mercurial
2 2 #
3 3 # Copyright 2005, 2006, 2007, 2008 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006, 2007 Alexis S. L. Carvalho <alexis@cecm.usp.br>
5 5 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
6 6 #
7 7 # This software may be used and distributed according to the terms of the
8 8 # GNU General Public License version 2 or any later version.
9 9 import os
10 10
11 11 from mercurial import util
12 12 from mercurial.i18n import _
13 13 try:
14 14 # avoid using deprecated/broken FakeSocket in python 2.6
15 15 import ssl
16 16 CERT_REQUIRED = ssl.CERT_REQUIRED
17 17 def ssl_wrap_socket(sock, keyfile, certfile,
18 18 cert_reqs=ssl.CERT_NONE, ca_certs=None):
19 19 sslsocket = ssl.wrap_socket(sock, keyfile, certfile,
20 20 cert_reqs=cert_reqs, ca_certs=ca_certs)
21 21 # check if wrap_socket failed silently because socket had been closed
22 22 # - see http://bugs.python.org/issue13721
23 23 if not sslsocket.cipher():
24 24 raise util.Abort(_('ssl connection failed'))
25 25 return sslsocket
26 26 except ImportError:
27 27 CERT_REQUIRED = 2
28 28
29 29 import socket, httplib
30 30
31 31 def ssl_wrap_socket(sock, key_file, cert_file,
32 32 cert_reqs=CERT_REQUIRED, ca_certs=None):
33 33 if not util.safehasattr(socket, 'ssl'):
34 34 raise util.Abort(_('Python SSL support not found'))
35 35 if ca_certs:
36 36 raise util.Abort(_(
37 37 'certificate checking requires Python 2.6'))
38 38
39 39 ssl = socket.ssl(sock, key_file, cert_file)
40 40 return httplib.FakeSocket(sock, ssl)
41 41
42 42 def _verifycert(cert, hostname):
43 43 '''Verify that cert (in socket.getpeercert() format) matches hostname.
44 44 CRLs is not handled.
45 45
46 46 Returns error message if any problems are found and None on success.
47 47 '''
48 48 if not cert:
49 49 return _('no certificate received')
50 50 dnsname = hostname.lower()
51 51 def matchdnsname(certname):
52 52 return (certname == dnsname or
53 53 '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1])
54 54
55 55 san = cert.get('subjectAltName', [])
56 56 if san:
57 57 certnames = [value.lower() for key, value in san if key == 'DNS']
58 58 for name in certnames:
59 59 if matchdnsname(name):
60 60 return None
61 61 if certnames:
62 62 return _('certificate is for %s') % ', '.join(certnames)
63 63
64 64 # subject is only checked when subjectAltName is empty
65 65 for s in cert.get('subject', []):
66 66 key, value = s[0]
67 67 if key == 'commonName':
68 68 try:
69 69 # 'subject' entries are unicode
70 70 certname = value.lower().encode('ascii')
71 71 except UnicodeEncodeError:
72 72 return _('IDN in certificate not supported')
73 73 if matchdnsname(certname):
74 74 return None
75 75 return _('certificate is for %s') % certname
76 76 return _('no commonName or subjectAltName found in certificate')
77 77
78 78
79 79 # CERT_REQUIRED means fetch the cert from the server all the time AND
80 80 # validate it against the CA store provided in web.cacerts.
81 81 #
82 82 # We COMPLETELY ignore CERT_REQUIRED on Python <= 2.5, as it's totally
83 83 # busted on those versions.
84 84
85 85 def sslkwargs(ui, host):
86 86 cacerts = ui.config('web', 'cacerts')
87 87 hostfingerprint = ui.config('hostfingerprints', host)
88 88 if cacerts and not hostfingerprint:
89 89 cacerts = util.expandpath(cacerts)
90 90 if not os.path.exists(cacerts):
91 91 raise util.Abort(_('could not find web.cacerts: %s') % cacerts)
92 92 return {'ca_certs': cacerts,
93 93 'cert_reqs': CERT_REQUIRED,
94 94 }
95 95 return {}
96 96
97 97 class validator(object):
98 98 def __init__(self, ui, host):
99 99 self.ui = ui
100 100 self.host = host
101 101
102 102 def __call__(self, sock):
103 103 host = self.host
104 104 cacerts = self.ui.config('web', 'cacerts')
105 105 hostfingerprint = self.ui.config('hostfingerprints', host)
106 106 if not getattr(sock, 'getpeercert', False): # python 2.5 ?
107 107 if hostfingerprint:
108 108 raise util.Abort(_("host fingerprint for %s can't be "
109 109 "verified (Python too old)") % host)
110 110 self.ui.warn(_("warning: certificate for %s can't be verified "
111 111 "(Python too old)\n") % host)
112 112 return
113 113 if not sock.cipher(): # work around http://bugs.python.org/issue13721
114 114 raise util.Abort(_('%s ssl connection error') % host)
115 115 peercert = sock.getpeercert(True)
116 if not peercert:
117 raise util.Abort(_('%s certificate error: '
118 'no certificate received') % host)
116 119 peerfingerprint = util.sha1(peercert).hexdigest()
117 120 nicefingerprint = ":".join([peerfingerprint[x:x + 2]
118 121 for x in xrange(0, len(peerfingerprint), 2)])
119 122 if hostfingerprint:
120 123 if peerfingerprint.lower() != \
121 124 hostfingerprint.replace(':', '').lower():
122 125 raise util.Abort(_('invalid certificate for %s with '
123 126 'fingerprint %s') % (host, nicefingerprint))
124 127 self.ui.debug('%s certificate matched fingerprint %s\n' %
125 128 (host, nicefingerprint))
126 129 elif cacerts:
127 130 msg = _verifycert(sock.getpeercert(), host)
128 131 if msg:
129 132 raise util.Abort(_('%s certificate error: %s') % (host, msg),
130 133 hint=_('configure hostfingerprint %s or use '
131 134 '--insecure to connect insecurely') %
132 135 nicefingerprint)
133 136 self.ui.debug('%s certificate successfully verified\n' % host)
134 137 else:
135 138 self.ui.warn(_('warning: %s certificate with fingerprint %s not '
136 139 'verified (check hostfingerprints or web.cacerts '
137 140 'config setting)\n') %
138 141 (host, nicefingerprint))
General Comments 0
You need to be logged in to leave comments. Login now