# HG changeset patch # User Augie Fackler # Date 2011-05-05 03:08:55 # Node ID 5fa21960b2f4c7f53e53090beef9e3373def27f4 # Parent b230922eb0c312b622dc2610851102bb004d41a4 sslutil: extracted ssl methods from httpsconnection in url.py This makes it easier to share ssl cert validation with other http implementations. diff --git a/mercurial/sslutil.py b/mercurial/sslutil.py new file mode 100644 --- /dev/null +++ b/mercurial/sslutil.py @@ -0,0 +1,126 @@ +# sslutil.py - SSL handling for mercurial +# +# Copyright 2005, 2006, 2007, 2008 Matt Mackall +# Copyright 2006, 2007 Alexis S. L. Carvalho +# Copyright 2006 Vadim Gelfer +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. +import os + +from mercurial import util +from mercurial.i18n import _ +try: + # avoid using deprecated/broken FakeSocket in python 2.6 + import ssl + ssl_wrap_socket = ssl.wrap_socket + CERT_REQUIRED = ssl.CERT_REQUIRED +except ImportError: + CERT_REQUIRED = 2 + + def ssl_wrap_socket(sock, key_file, cert_file, + cert_reqs=CERT_REQUIRED, ca_certs=None): + if ca_certs: + raise util.Abort(_( + 'certificate checking requires Python 2.6')) + + ssl = socket.ssl(sock, key_file, cert_file) + return httplib.FakeSocket(sock, ssl) + +def _verifycert(cert, hostname): + '''Verify that cert (in socket.getpeercert() format) matches hostname. + CRLs is not handled. + + Returns error message if any problems are found and None on success. + ''' + if not cert: + return _('no certificate received') + dnsname = hostname.lower() + def matchdnsname(certname): + return (certname == dnsname or + '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1]) + + san = cert.get('subjectAltName', []) + if san: + certnames = [value.lower() for key, value in san if key == 'DNS'] + for name in certnames: + if matchdnsname(name): + return None + return _('certificate is for %s') % ', '.join(certnames) + + # subject is only checked when subjectAltName is empty + for s in cert.get('subject', []): + key, value = s[0] + if key == 'commonName': + try: + # 'subject' entries are unicode + certname = value.lower().encode('ascii') + except UnicodeEncodeError: + return _('IDN in certificate not supported') + if matchdnsname(certname): + return None + return _('certificate is for %s') % certname + return _('no commonName or subjectAltName found in certificate') + + +# CERT_REQUIRED means fetch the cert from the server all the time AND +# validate it against the CA store provided in web.cacerts. +# +# We COMPLETELY ignore CERT_REQUIRED on Python <= 2.5, as it's totally +# busted on those versions. + +def sslkwargs(ui, host): + cacerts = ui.config('web', 'cacerts') + hostfingerprint = ui.config('hostfingerprints', host) + if cacerts and not hostfingerprint: + cacerts = util.expandpath(cacerts) + if not os.path.exists(cacerts): + raise util.Abort(_('could not find web.cacerts: %s') % cacerts) + return {'ca_certs': cacerts, + 'cert_reqs': CERT_REQUIRED, + } + return {} + +class validator(object): + def __init__(self, ui, host): + self.ui = ui + self.host = host + + def __call__(self, sock): + host = self.host + cacerts = self.ui.config('web', 'cacerts') + hostfingerprint = self.ui.config('hostfingerprints', host) + if cacerts and not hostfingerprint: + msg = _verifycert(sock.getpeercert(), host) + if msg: + raise util.Abort(_('%s certificate error: %s ' + '(use --insecure to connect ' + 'insecurely)') % (host, msg)) + self.ui.debug('%s certificate successfully verified\n' % host) + else: + if getattr(sock, 'getpeercert', False): + peercert = sock.getpeercert(True) + peerfingerprint = util.sha1(peercert).hexdigest() + nicefingerprint = ":".join([peerfingerprint[x:x + 2] + for x in xrange(0, len(peerfingerprint), 2)]) + if hostfingerprint: + if peerfingerprint.lower() != \ + hostfingerprint.replace(':', '').lower(): + raise util.Abort(_('invalid certificate for %s ' + 'with fingerprint %s') % + (host, nicefingerprint)) + self.ui.debug('%s certificate matched fingerprint %s\n' % + (host, nicefingerprint)) + else: + self.ui.warn(_('warning: %s certificate ' + 'with fingerprint %s not verified ' + '(check hostfingerprints or web.cacerts ' + 'config setting)\n') % + (host, nicefingerprint)) + else: # python 2.5 ? + if hostfingerprint: + raise util.Abort(_('no certificate for %s with ' + 'configured hostfingerprint') % host) + self.ui.warn(_('warning: %s certificate not verified ' + '(check web.cacerts config setting)\n') % + host) diff --git a/mercurial/url.py b/mercurial/url.py --- a/mercurial/url.py +++ b/mercurial/url.py @@ -10,7 +10,7 @@ import urllib, urllib2, httplib, os, socket, cStringIO import __builtin__ from i18n import _ -import keepalive, util +import keepalive, util, sslutil def readauthforuri(ui, uri): # Read configuration @@ -202,23 +202,6 @@ def _gen_sendfile(orgsend): has_https = hasattr(urllib2, 'HTTPSHandler') if has_https: try: - # avoid using deprecated/broken FakeSocket in python 2.6 - import ssl - _ssl_wrap_socket = ssl.wrap_socket - CERT_REQUIRED = ssl.CERT_REQUIRED - except ImportError: - CERT_REQUIRED = 2 - - def _ssl_wrap_socket(sock, key_file, cert_file, - cert_reqs=CERT_REQUIRED, ca_certs=None): - if ca_certs: - raise util.Abort(_( - 'certificate checking requires Python 2.6')) - - ssl = socket.ssl(sock, key_file, cert_file) - return httplib.FakeSocket(sock, ssl) - - try: _create_connection = socket.create_connection except AttributeError: _GLOBAL_DEFAULT_TIMEOUT = object() @@ -257,7 +240,7 @@ class httpconnection(keepalive.HTTPConne self.sock.connect((self.host, self.port)) if _generic_proxytunnel(self): # we do not support client x509 certificates - self.sock = _ssl_wrap_socket(self.sock, None, None) + self.sock = sslutil.ssl_wrap_socket(self.sock, None, None) else: keepalive.HTTPConnection.connect(self) @@ -398,41 +381,6 @@ class httphandler(keepalive.HTTPHandler) _generic_start_transaction(self, h, req) return keepalive.HTTPHandler._start_transaction(self, h, req) -def _verifycert(cert, hostname): - '''Verify that cert (in socket.getpeercert() format) matches hostname. - CRLs is not handled. - - Returns error message if any problems are found and None on success. - ''' - if not cert: - return _('no certificate received') - dnsname = hostname.lower() - def matchdnsname(certname): - return (certname == dnsname or - '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1]) - - san = cert.get('subjectAltName', []) - if san: - certnames = [value.lower() for key, value in san if key == 'DNS'] - for name in certnames: - if matchdnsname(name): - return None - return _('certificate is for %s') % ', '.join(certnames) - - # subject is only checked when subjectAltName is empty - for s in cert.get('subject', []): - key, value = s[0] - if key == 'commonName': - try: - # 'subject' entries are unicode - certname = value.lower().encode('ascii') - except UnicodeEncodeError: - return _('IDN in certificate not supported') - if matchdnsname(certname): - return None - return _('certificate is for %s') % certname - return _('no commonName or subjectAltName found in certificate') - if has_https: class httpsconnection(httplib.HTTPSConnection): response_class = keepalive.HTTPResponse @@ -447,53 +395,10 @@ if has_https: if self.realhostport: # use CONNECT proxy _generic_proxytunnel(self) host = self.realhostport.rsplit(':', 1)[0] - - cacerts = self.ui.config('web', 'cacerts') - hostfingerprint = self.ui.config('hostfingerprints', host) - - if cacerts and not hostfingerprint: - cacerts = util.expandpath(cacerts) - if not os.path.exists(cacerts): - raise util.Abort(_('could not find ' - 'web.cacerts: %s') % cacerts) - self.sock = _ssl_wrap_socket(self.sock, self.key_file, - self.cert_file, cert_reqs=CERT_REQUIRED, - ca_certs=cacerts) - msg = _verifycert(self.sock.getpeercert(), host) - if msg: - raise util.Abort(_('%s certificate error: %s ' - '(use --insecure to connect ' - 'insecurely)') % (host, msg)) - self.ui.debug('%s certificate successfully verified\n' % host) - else: - self.sock = _ssl_wrap_socket(self.sock, self.key_file, - self.cert_file) - if hasattr(self.sock, 'getpeercert'): - peercert = self.sock.getpeercert(True) - peerfingerprint = util.sha1(peercert).hexdigest() - nicefingerprint = ":".join([peerfingerprint[x:x + 2] - for x in xrange(0, len(peerfingerprint), 2)]) - if hostfingerprint: - if peerfingerprint.lower() != \ - hostfingerprint.replace(':', '').lower(): - raise util.Abort(_('invalid certificate for %s ' - 'with fingerprint %s') % - (host, nicefingerprint)) - self.ui.debug('%s certificate matched fingerprint %s\n' % - (host, nicefingerprint)) - else: - self.ui.warn(_('warning: %s certificate ' - 'with fingerprint %s not verified ' - '(check hostfingerprints or web.cacerts ' - 'config setting)\n') % - (host, nicefingerprint)) - else: # python 2.5 ? - if hostfingerprint: - raise util.Abort(_('no certificate for %s with ' - 'configured hostfingerprint') % host) - self.ui.warn(_('warning: %s certificate not verified ' - '(check web.cacerts config setting)\n') % - host) + self.sock = sslutil.ssl_wrap_socket( + self.sock, self.key_file, self.cert_file, + **sslutil.sslkwargs(self.ui, host)) + sslutil.validator(self.ui, host)(self.sock) class httpshandler(keepalive.KeepAliveHandler, urllib2.HTTPSHandler): def __init__(self, ui): diff --git a/tests/test-url.py b/tests/test-url.py --- a/tests/test-url.py +++ b/tests/test-url.py @@ -7,7 +7,7 @@ def check(a, b): def cert(cn): return dict(subject=((('commonName', cn),),)) -from mercurial.url import _verifycert +from mercurial.sslutil import _verifycert # Test non-wildcard certificates check(_verifycert(cert('example.com'), 'example.com'),