##// END OF EJS Templates
feat(artifacts): new artifact storage engines allowing an s3 based uploads
feat(artifacts): new artifact storage engines allowing an s3 based uploads

File last commit:

r5381:0a39631e default
r5516:3496180b default
Show More
encrypt.py
155 lines | 5.1 KiB | text/x-python | PythonLexer
# Copyright (C) 2014-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Generic encryption library for RhodeCode
"""
import base64
import logging
from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Hash import HMAC, SHA256
from rhodecode.lib.str_utils import safe_bytes, safe_str
from rhodecode.lib.exceptions import signature_verification_error
class InvalidDecryptedValue(str):
def __new__(cls, content):
"""
This will generate something like this::
<InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
And represent a safe indicator that encryption key is broken
"""
content = f'<{cls.__name__}({content[:16]}...)>'
return str.__new__(cls, content)
KEY_FORMAT = b'enc$aes_hmac${1}'
class AESCipher(object):
def __init__(self, key: bytes, hmac=False, strict_verification=True):
if not key:
raise ValueError('passed key variable is empty')
self.strict_verification = strict_verification
self.block_size = 32
self.hmac_size = 32
self.hmac = hmac
self.key = SHA256.new(safe_bytes(key)).digest()
self.hmac_key = SHA256.new(self.key).digest()
def verify_hmac_signature(self, raw_data):
org_hmac_signature = raw_data[-self.hmac_size:]
data_without_sig = raw_data[:-self.hmac_size]
recomputed_hmac = HMAC.new(
self.hmac_key, data_without_sig, digestmod=SHA256).digest()
return org_hmac_signature == recomputed_hmac
def encrypt(self, raw: bytes):
raw = self._pad(raw)
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CBC, iv)
enc_value = cipher.encrypt(raw)
hmac_signature = b''
if self.hmac:
# compute hmac+sha256 on iv + enc text, we use
# encrypt then mac method to create the signature
hmac_signature = HMAC.new(
self.hmac_key, iv + enc_value, digestmod=SHA256).digest()
return base64.b64encode(iv + enc_value + hmac_signature)
def decrypt(self, enc, safe=True) -> bytes | InvalidDecryptedValue:
enc_org = enc
try:
enc = base64.b64decode(enc)
except Exception:
logging.exception('Failed Base64 decode')
raise signature_verification_error('Failed Base64 decode')
if self.hmac and len(enc) > self.hmac_size:
if self.verify_hmac_signature(enc):
# cut off the HMAC verification digest
enc = enc[:-self.hmac_size]
else:
decrypt_fail = InvalidDecryptedValue(safe_str(enc_org))
if safe:
return decrypt_fail
raise signature_verification_error(decrypt_fail)
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def _pad(self, s):
block_pad = (self.block_size - len(s) % self.block_size)
return s + block_pad * safe_bytes(chr(block_pad))
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s)-1:])]
def validate_and_decrypt_data(enc_data, enc_key, enc_strict_mode=False, safe=True):
enc_data = safe_str(enc_data)
if '$' not in enc_data:
# probably not encrypted values
return enc_data
parts = enc_data.split('$', 3)
if len(parts) != 3:
raise ValueError(f'Encrypted Data has invalid format, expected {KEY_FORMAT}, got {parts}, org value: {enc_data}')
enc_type = parts[1]
enc_data_part = parts[2]
if parts[0] != 'enc':
# parts ok but without our header?
return enc_data
# at that stage we know it's our encryption
if enc_type == 'aes':
decrypted_data = AESCipher(enc_key).decrypt(enc_data_part, safe=safe)
elif enc_type == 'aes_hmac':
decrypted_data = AESCipher(
enc_key, hmac=True,
strict_verification=enc_strict_mode).decrypt(enc_data_part, safe=safe)
else:
raise ValueError(
f'Encryption type part is wrong, must be `aes` '
f'or `aes_hmac`, got `{enc_type}` instead')
return decrypted_data
def encrypt_data(data, enc_key: bytes):
enc_key = safe_bytes(enc_key)
enc_value = AESCipher(enc_key, hmac=True).encrypt(safe_bytes(data))
return KEY_FORMAT.replace(b'{1}', enc_value)