ssh_key.py
146 lines
| 5.1 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2013-2023 RhodeCode GmbH | |||
r1993 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import logging | ||||
import traceback | ||||
import sshpubkeys | ||||
import sshpubkeys.exceptions | ||||
r3520 | from cryptography.hazmat.primitives.asymmetric import rsa | |||
from cryptography.hazmat.primitives import serialization as crypto_serialization | ||||
from cryptography.hazmat.backends import default_backend as crypto_default_backend | ||||
r5070 | from rhodecode.lib.str_utils import safe_bytes, safe_str | |||
r1993 | from rhodecode.model import BaseModel | |||
from rhodecode.model.db import UserSshKeys | ||||
from rhodecode.model.meta import Session | ||||
r3520 | ||||
r1993 | log = logging.getLogger(__name__) | |||
class SshKeyModel(BaseModel): | ||||
cls = UserSshKeys | ||||
r4238 | DEFAULT_PRIVATE_KEY_FORMAT = 'pkcs8' | |||
r1993 | ||||
def parse_key(self, key_data): | ||||
""" | ||||
print(ssh.bits) # 768 | ||||
print(ssh.hash_md5()) # 56:84:1e:90:08:3b:60:c7:29:70:5f:5e:25:a6:3b:86 | ||||
print(ssh.hash_sha256()) # SHA256:xk3IEJIdIoR9MmSRXTP98rjDdZocmXJje/28ohMQEwM | ||||
print(ssh.hash_sha512()) # SHA512:1C3lNBhjpDVQe39hnyy+xvlZYU3IPwzqK1rVneGavy6O3/ebjEQSFvmeWoyMTplIanmUK1hmr9nA8Skmj516HA | ||||
print(ssh.comment) # ojar@ojar-laptop | ||||
print(ssh.options_raw) # None (string of optional options at the beginning of public key) | ||||
print(ssh.options) # None (options as a dictionary, parsed and validated) | ||||
:param key_data: | ||||
:return: | ||||
""" | ||||
ssh = sshpubkeys.SSHKey(strict_mode=True) | ||||
try: | ||||
ssh.parse(key_data) | ||||
return ssh | ||||
except sshpubkeys.exceptions.InvalidKeyException as err: | ||||
log.error("Invalid key: %s", err) | ||||
raise | ||||
except NotImplementedError as err: | ||||
log.error("Invalid key type: %s", err) | ||||
raise | ||||
except Exception as err: | ||||
log.error("Key Parse error: %s", err) | ||||
raise | ||||
r4238 | def generate_keypair(self, comment=None, private_format=DEFAULT_PRIVATE_KEY_FORMAT): | |||
r1993 | ||||
r3520 | key = rsa.generate_private_key( | |||
backend=crypto_default_backend(), | ||||
public_exponent=65537, | ||||
key_size=2048 | ||||
) | ||||
r4238 | if private_format == self.DEFAULT_PRIVATE_KEY_FORMAT: | |||
private_format = crypto_serialization.PrivateFormat.PKCS8 | ||||
else: | ||||
# legacy format that can be used by older systems, use if pkcs8 have | ||||
# problems | ||||
private_format = crypto_serialization.PrivateFormat.TraditionalOpenSSL | ||||
r3520 | private_key = key.private_bytes( | |||
crypto_serialization.Encoding.PEM, | ||||
r4238 | private_format, | |||
r3520 | crypto_serialization.NoEncryption()) | |||
r5070 | private_key = safe_str(private_key) | |||
r3520 | public_key = key.public_key().public_bytes( | |||
crypto_serialization.Encoding.OpenSSH, | ||||
crypto_serialization.PublicFormat.OpenSSH | ||||
) | ||||
r5070 | public_key = safe_str(public_key) | |||
r3520 | ||||
r1993 | if comment: | |||
r3520 | public_key = public_key + " " + comment | |||
return private_key, public_key | ||||
r1993 | ||||
def create(self, user, fingerprint, key_data, description): | ||||
""" | ||||
""" | ||||
user = self._get_user(user) | ||||
new_ssh_key = UserSshKeys() | ||||
new_ssh_key.ssh_key_fingerprint = fingerprint | ||||
new_ssh_key.ssh_key_data = key_data | ||||
new_ssh_key.user_id = user.user_id | ||||
new_ssh_key.description = description | ||||
Session().add(new_ssh_key) | ||||
return new_ssh_key | ||||
def delete(self, ssh_key_id, user=None): | ||||
""" | ||||
Deletes given api_key, if user is set it also filters the object for | ||||
deletion by given user. | ||||
""" | ||||
ssh_key = UserSshKeys.query().filter( | ||||
UserSshKeys.ssh_key_id == ssh_key_id) | ||||
if user: | ||||
user = self._get_user(user) | ||||
ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id) | ||||
ssh_key = ssh_key.scalar() | ||||
if ssh_key: | ||||
try: | ||||
Session().delete(ssh_key) | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
raise | ||||
def get_ssh_keys(self, user): | ||||
user = self._get_user(user) | ||||
user_ssh_keys = UserSshKeys.query()\ | ||||
.filter(UserSshKeys.user_id == user.user_id) | ||||
user_ssh_keys = user_ssh_keys.order_by(UserSshKeys.ssh_key_id) | ||||
return user_ssh_keys | ||||
def get_ssh_key_by_fingerprint(self, ssh_key_fingerprint): | ||||
user_ssh_key = UserSshKeys.query()\ | ||||
.filter(UserSshKeys.ssh_key_fingerprint == ssh_key_fingerprint)\ | ||||
.first() | ||||
return user_ssh_key | ||||