##// END OF EJS Templates
fix: fixed s3 region. Fixes: RCCE-98
fix: fixed s3 region. Fixes: RCCE-98

File last commit:

r5381:0a39631e default
r5457:1e916a13 default
Show More
encrypt.py
155 lines | 5.1 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2014-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Generic encryption library for RhodeCode
"""
import base64
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 import logging
project: added all source files and assets
r1
from Crypto.Cipher import AES
from Crypto import Random
encryption: Implement a slightly improved AesCipher encryption....
r281 from Crypto.Hash import HMAC, SHA256
project: added all source files and assets
r1
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 from rhodecode.lib.str_utils import safe_bytes, safe_str
from rhodecode.lib.exceptions import signature_verification_error
encryption: Implement a slightly improved AesCipher encryption....
r281
class InvalidDecryptedValue(str):
def __new__(cls, content):
"""
This will generate something like this::
<InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
And represent a safe indicator that encryption key is broken
"""
modernize: updates for python3
r5095 content = f'<{cls.__name__}({content[:16]}...)>'
encryption: Implement a slightly improved AesCipher encryption....
r281 return str.__new__(cls, content)
fix(encryption): don't be strict on enc format when no enc headers are missing....
r5381
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 KEY_FORMAT = b'enc$aes_hmac${1}'
encryption: Implement a slightly improved AesCipher encryption....
r281
project: added all source files and assets
r1 class AESCipher(object):
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995
def __init__(self, key: bytes, hmac=False, strict_verification=True):
project: added all source files and assets
r1 if not key:
raise ValueError('passed key variable is empty')
encryption: Implement a slightly improved AesCipher encryption....
r281 self.strict_verification = strict_verification
project: added all source files and assets
r1 self.block_size = 32
encryption: Implement a slightly improved AesCipher encryption....
r281 self.hmac_size = 32
self.hmac = hmac
encrypt-module: fixed str passed to function that expected bytes
r4946 self.key = SHA256.new(safe_bytes(key)).digest()
encryption: Implement a slightly improved AesCipher encryption....
r281 self.hmac_key = SHA256.new(self.key).digest()
def verify_hmac_signature(self, raw_data):
org_hmac_signature = raw_data[-self.hmac_size:]
data_without_sig = raw_data[:-self.hmac_size]
recomputed_hmac = HMAC.new(
self.hmac_key, data_without_sig, digestmod=SHA256).digest()
return org_hmac_signature == recomputed_hmac
project: added all source files and assets
r1
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 def encrypt(self, raw: bytes):
project: added all source files and assets
r1 raw = self._pad(raw)
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CBC, iv)
encryption: Implement a slightly improved AesCipher encryption....
r281 enc_value = cipher.encrypt(raw)
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 hmac_signature = b''
encryption: Implement a slightly improved AesCipher encryption....
r281 if self.hmac:
# compute hmac+sha256 on iv + enc text, we use
# encrypt then mac method to create the signature
hmac_signature = HMAC.new(
self.hmac_key, iv + enc_value, digestmod=SHA256).digest()
return base64.b64encode(iv + enc_value + hmac_signature)
project: added all source files and assets
r1
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 def decrypt(self, enc, safe=True) -> bytes | InvalidDecryptedValue:
encryption: Implement a slightly improved AesCipher encryption....
r281 enc_org = enc
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 try:
enc = base64.b64decode(enc)
except Exception:
logging.exception('Failed Base64 decode')
raise signature_verification_error('Failed Base64 decode')
encryption: Implement a slightly improved AesCipher encryption....
r281
if self.hmac and len(enc) > self.hmac_size:
if self.verify_hmac_signature(enc):
# cut off the HMAC verification digest
enc = enc[:-self.hmac_size]
else:
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 decrypt_fail = InvalidDecryptedValue(safe_str(enc_org))
if safe:
return decrypt_fail
raise signature_verification_error(decrypt_fail)
encryption: Implement a slightly improved AesCipher encryption....
r281
project: added all source files and assets
r1 iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def _pad(self, s):
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 block_pad = (self.block_size - len(s) % self.block_size)
return s + block_pad * safe_bytes(chr(block_pad))
project: added all source files and assets
r1
@staticmethod
def _unpad(s):
encryption: added new backend using cryptography + Fernet encryption....
r3522 return s[:-ord(s[len(s)-1:])]
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 def validate_and_decrypt_data(enc_data, enc_key, enc_strict_mode=False, safe=True):
enc_data = safe_str(enc_data)
dbmigrate: fixed migration problems
r5166 if '$' not in enc_data:
# probably not encrypted values
return enc_data
encryption: added new backend using cryptography + Fernet encryption....
r3522 parts = enc_data.split('$', 3)
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 if len(parts) != 3:
dbmigrate: fixed migration problems
r5166 raise ValueError(f'Encrypted Data has invalid format, expected {KEY_FORMAT}, got {parts}, org value: {enc_data}')
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995
enc_type = parts[1]
enc_data_part = parts[2]
if parts[0] != 'enc':
dbmigrate: fixed migration problems
r5166 # parts ok but without our header?
encryption: added new backend using cryptography + Fernet encryption....
r3522 return enc_data
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 # at that stage we know it's our encryption
if enc_type == 'aes':
decrypted_data = AESCipher(enc_key).decrypt(enc_data_part, safe=safe)
elif enc_type == 'aes_hmac':
decrypted_data = AESCipher(
enc_key, hmac=True,
strict_verification=enc_strict_mode).decrypt(enc_data_part, safe=safe)
else:
raise ValueError(
f'Encryption type part is wrong, must be `aes` '
f'or `aes_hmac`, got `{enc_type}` instead')
return decrypted_data
def encrypt_data(data, enc_key: bytes):
enc_key = safe_bytes(enc_key)
enc_value = AESCipher(enc_key, hmac=True).encrypt(safe_bytes(data))
return KEY_FORMAT.replace(b'{1}', enc_value)