diff --git a/mercurial/win32.py b/mercurial/win32.py --- a/mercurial/win32.py +++ b/mercurial/win32.py @@ -22,6 +22,7 @@ from . import ( _kernel32 = ctypes.windll.kernel32 _advapi32 = ctypes.windll.advapi32 _user32 = ctypes.windll.user32 +_crypt32 = ctypes.windll.crypt32 _BOOL = ctypes.c_long _WORD = ctypes.c_ushort @@ -31,6 +32,7 @@ from . import ( _LPCSTR = _LPSTR = ctypes.c_char_p _HANDLE = ctypes.c_void_p _HWND = _HANDLE +_PCCERT_CONTEXT = ctypes.c_void_p _INVALID_HANDLE_VALUE = _HANDLE(-1).value @@ -134,8 +136,74 @@ class _CONSOLE_SCREEN_BUFFER_INFO(ctypes _STD_OUTPUT_HANDLE = _DWORD(-11).value _STD_ERROR_HANDLE = _DWORD(-12).value +# CERT_TRUST_STATUS dwErrorStatus +CERT_TRUST_IS_PARTIAL_CHAIN = 0x10000 + +# CertCreateCertificateContext encodings +X509_ASN_ENCODING = 0x00000001 +PKCS_7_ASN_ENCODING = 0x00010000 + +# These structs are only complete enough to achieve what we need. +class CERT_CHAIN_CONTEXT(ctypes.Structure): + _fields_ = ( + ("cbSize", _DWORD), + + # CERT_TRUST_STATUS struct + ("dwErrorStatus", _DWORD), + ("dwInfoStatus", _DWORD), + + ("cChain", _DWORD), + ("rgpChain", ctypes.c_void_p), + ("cLowerQualityChainContext", _DWORD), + ("rgpLowerQualityChainContext", ctypes.c_void_p), + ("fHasRevocationFreshnessTime", _BOOL), + ("dwRevocationFreshnessTime", _DWORD), + ) + +class CERT_USAGE_MATCH(ctypes.Structure): + _fields_ = ( + ("dwType", _DWORD), + + # CERT_ENHKEY_USAGE struct + ("cUsageIdentifier", _DWORD), + ("rgpszUsageIdentifier", ctypes.c_void_p), # LPSTR * + ) + +class CERT_CHAIN_PARA(ctypes.Structure): + _fields_ = ( + ("cbSize", _DWORD), + ("RequestedUsage", CERT_USAGE_MATCH), + ("RequestedIssuancePolicy", CERT_USAGE_MATCH), + ("dwUrlRetrievalTimeout", _DWORD), + ("fCheckRevocationFreshnessTime", _BOOL), + ("dwRevocationFreshnessTime", _DWORD), + ("pftCacheResync", ctypes.c_void_p), # LPFILETIME + ("pStrongSignPara", ctypes.c_void_p), # PCCERT_STRONG_SIGN_PARA + ("dwStrongSignFlags", _DWORD), + ) + # types of parameters of C functions used (required by pypy) +_crypt32.CertCreateCertificateContext.argtypes = [_DWORD, # cert encoding + ctypes.c_char_p, # cert + _DWORD] # cert size +_crypt32.CertCreateCertificateContext.restype = _PCCERT_CONTEXT + +_crypt32.CertGetCertificateChain.argtypes = [ + ctypes.c_void_p, # HCERTCHAINENGINE + _PCCERT_CONTEXT, + ctypes.c_void_p, # LPFILETIME + ctypes.c_void_p, # HCERTSTORE + ctypes.c_void_p, # PCERT_CHAIN_PARA + _DWORD, + ctypes.c_void_p, # LPVOID + ctypes.c_void_p # PCCERT_CHAIN_CONTEXT * + ] +_crypt32.CertGetCertificateChain.restype = _BOOL + +_crypt32.CertFreeCertificateContext.argtypes = [_PCCERT_CONTEXT] +_crypt32.CertFreeCertificateContext.restype = _BOOL + _kernel32.CreateFileA.argtypes = [_LPCSTR, _DWORD, _DWORD, ctypes.c_void_p, _DWORD, _DWORD, _HANDLE] _kernel32.CreateFileA.restype = _HANDLE @@ -234,6 +302,51 @@ def _getfileinfo(name): finally: _kernel32.CloseHandle(fh) +def checkcertificatechain(cert, build=True): + '''Tests the given certificate to see if there is a complete chain to a + trusted root certificate. As a side effect, missing certificates are + downloaded and installed unless ``build=False``. True is returned if a + chain to a trusted root exists (even if built on the fly), otherwise + False. NB: A chain to a trusted root does NOT imply that the certificate + is valid. + ''' + + chainctxptr = ctypes.POINTER(CERT_CHAIN_CONTEXT) + + pchainctx = chainctxptr() + chainpara = CERT_CHAIN_PARA(cbSize=ctypes.sizeof(CERT_CHAIN_PARA), + RequestedUsage=CERT_USAGE_MATCH()) + + certctx = _crypt32.CertCreateCertificateContext(X509_ASN_ENCODING, cert, + len(cert)) + if certctx is None: + _raiseoserror('CertCreateCertificateContext') + + flags = 0 + + if not build: + flags |= 0x100 # CERT_CHAIN_DISABLE_AUTH_ROOT_AUTO_UPDATE + + try: + # Building the certificate chain will update root certs as necessary. + if not _crypt32.CertGetCertificateChain(None, # hChainEngine + certctx, # pCertContext + None, # pTime + None, # hAdditionalStore + ctypes.byref(chainpara), + flags, + None, # pvReserved + ctypes.byref(pchainctx)): + _raiseoserror('CertGetCertificateChain') + + chainctx = pchainctx.contents + + return chainctx.dwErrorStatus & CERT_TRUST_IS_PARTIAL_CHAIN == 0 + finally: + if pchainctx: + _crypt32.CertFreeCertificateChain(pchainctx) + _crypt32.CertFreeCertificateContext(certctx) + def oslink(src, dst): try: if not _kernel32.CreateHardLinkA(dst, src, None):