##// END OF EJS Templates
encrypt-module: fixed str passed to function that expected bytes
super-admin -
r4946:6cad5217 default
parent child Browse files
Show More
@@ -1,138 +1,138 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Generic encryption library for RhodeCode
24 24 """
25 25
26 26 import base64
27 27
28 28 from Crypto.Cipher import AES
29 29 from Crypto import Random
30 30 from Crypto.Hash import HMAC, SHA256
31 31
32 from rhodecode.lib.utils2 import safe_str
32 from rhodecode.lib.str_utils import safe_bytes
33 33
34 34
35 35 class SignatureVerificationError(Exception):
36 36 pass
37 37
38 38
39 39 class InvalidDecryptedValue(str):
40 40
41 41 def __new__(cls, content):
42 42 """
43 43 This will generate something like this::
44 44 <InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
45 45 And represent a safe indicator that encryption key is broken
46 46 """
47 47 content = '<{}({}...)>'.format(cls.__name__, content[:16])
48 48 return str.__new__(cls, content)
49 49
50 50
51 51 class AESCipher(object):
52 52 def __init__(self, key, hmac=False, strict_verification=True):
53 53 if not key:
54 54 raise ValueError('passed key variable is empty')
55 55 self.strict_verification = strict_verification
56 56 self.block_size = 32
57 57 self.hmac_size = 32
58 58 self.hmac = hmac
59 59
60 self.key = SHA256.new(safe_str(key)).digest()
60 self.key = SHA256.new(safe_bytes(key)).digest()
61 61 self.hmac_key = SHA256.new(self.key).digest()
62 62
63 63 def verify_hmac_signature(self, raw_data):
64 64 org_hmac_signature = raw_data[-self.hmac_size:]
65 65 data_without_sig = raw_data[:-self.hmac_size]
66 66 recomputed_hmac = HMAC.new(
67 67 self.hmac_key, data_without_sig, digestmod=SHA256).digest()
68 68 return org_hmac_signature == recomputed_hmac
69 69
70 70 def encrypt(self, raw):
71 71 raw = self._pad(raw)
72 72 iv = Random.new().read(AES.block_size)
73 73 cipher = AES.new(self.key, AES.MODE_CBC, iv)
74 74 enc_value = cipher.encrypt(raw)
75 75
76 76 hmac_signature = ''
77 77 if self.hmac:
78 78 # compute hmac+sha256 on iv + enc text, we use
79 79 # encrypt then mac method to create the signature
80 80 hmac_signature = HMAC.new(
81 81 self.hmac_key, iv + enc_value, digestmod=SHA256).digest()
82 82
83 83 return base64.b64encode(iv + enc_value + hmac_signature)
84 84
85 85 def decrypt(self, enc):
86 86 enc_org = enc
87 87 enc = base64.b64decode(enc)
88 88
89 89 if self.hmac and len(enc) > self.hmac_size:
90 90 if self.verify_hmac_signature(enc):
91 91 # cut off the HMAC verification digest
92 92 enc = enc[:-self.hmac_size]
93 93 else:
94 94 if self.strict_verification:
95 95 raise SignatureVerificationError(
96 96 "Encryption signature verification failed. "
97 97 "Please check your secret key, and/or encrypted value. "
98 98 "Secret key is stored as "
99 99 "`rhodecode.encrypted_values.secret` or "
100 100 "`beaker.session.secret` inside .ini file")
101 101
102 102 return InvalidDecryptedValue(enc_org)
103 103
104 104 iv = enc[:AES.block_size]
105 105 cipher = AES.new(self.key, AES.MODE_CBC, iv)
106 106 return self._unpad(cipher.decrypt(enc[AES.block_size:]))
107 107
108 108 def _pad(self, s):
109 109 return (s + (self.block_size - len(s) % self.block_size)
110 110 * chr(self.block_size - len(s) % self.block_size))
111 111
112 112 @staticmethod
113 113 def _unpad(s):
114 114 return s[:-ord(s[len(s)-1:])]
115 115
116 116
117 117 def validate_and_get_enc_data(enc_data, enc_key, enc_strict_mode):
118 118 parts = enc_data.split('$', 3)
119 119 if not len(parts) == 3:
120 120 # probably not encrypted values
121 121 return enc_data
122 122 else:
123 123 if parts[0] != 'enc':
124 124 # parts ok but without our header ?
125 125 return enc_data
126 126
127 127 # at that stage we know it's our encryption
128 128 if parts[1] == 'aes':
129 129 decrypted_data = AESCipher(enc_key).decrypt(parts[2])
130 130 elif parts[1] == 'aes_hmac':
131 131 decrypted_data = AESCipher(
132 132 enc_key, hmac=True,
133 133 strict_verification=enc_strict_mode).decrypt(parts[2])
134 134 else:
135 135 raise ValueError(
136 136 'Encryption type part is wrong, must be `aes` '
137 137 'or `aes_hmac`, got `%s` instead' % (parts[1]))
138 138 return decrypted_data
General Comments 0
You need to be logged in to leave comments. Login now