"""Contains common functionality for PKI management commands."""# ruff: noqa: T201 # print is fine in management commandsfrom__future__importannotationsfrompathlibimportPathfromtypingimportTYPE_CHECKINGfromcryptographyimportx509fromcryptography.hazmat.primitivesimporthashesfromcryptography.hazmat.primitives.asymmetricimportrsafromcryptography.hazmat.primitives.serializationimport(BestAvailableEncryption,Encoding,NoEncryption,PrivateFormat,pkcs12,)fromcryptography.x509.oidimportNameOIDfrompki.modelsimportCertificateModelfrompki.util.x509importCertificateGeneratorifTYPE_CHECKING:fromtrustpoint_core.typesimportPrivateKey
[docs]classCertificateCreationCommandMixin(CertificateGenerator):"""Mixin for management commands that create certificates."""@classmethod
[docs]defstore_issuing_ca(cls,issuing_ca_cert:x509.Certificate,chain:list[x509.Certificate],private_key:PrivateKey,filename:str)->None:"""Store the Issuing CA certificate and private key in a PKCS12 file."""tests_data_path=Path(__file__).parent.parent.parent.parent.parent/Path('tests/data/issuing_cas')issuing_ca_path=tests_data_path/Path(filename)tests_data_path.mkdir(exist_ok=True)print('\nSaving Issuing CA and Certificates\n')p12=pkcs12.serialize_key_and_certificates(name=b'',key=private_key,cert=issuing_ca_cert,cas=chain,encryption_algorithm=BestAvailableEncryption(b'testing321'),)withPath(issuing_ca_path).open('wb')asf:f.write(p12)print(f'Issuing CA: {issuing_ca_path}')print('Issuing CA - Password: testing321\n')
@staticmethod
[docs]defstore_ee_certs(certs:dict[str,x509.Certificate])->None:"""Store the end entity certificates as .pem files."""tests_data_path=Path(__file__).parent.parent.parent.parent.parent/Path('tests/data/issuing_cas')forname,certincerts.items():cert_path=tests_data_path/Path(f'{name}.pem')withPath(cert_path).open('wb')asf:f.write(cert.public_bytes(encoding=Encoding.PEM))print(f'Stored EE certificate: {cert_path}')
@staticmethod
[docs]defstore_ee_keys(keys:dict[str,PrivateKey])->None:"""Store the end entity keys as .pem files."""tests_data_path=Path(__file__).parent.parent.parent.parent.parent/Path('tests/data/issuing_cas')forname,keyinkeys.items():key_path=tests_data_path/Path(f'{name}.pem')withPath(key_path).open('wb')asf:f.write(key.private_bytes(encoding=Encoding.PEM,format=PrivateFormat.TraditionalOpenSSL,encryption_algorithm=NoEncryption(),))print(f'Stored EE certificate: {key_path}')
@staticmethod
[docs]defsave_ee_certs(certs:dict[str,x509.Certificate])->None:"""Save the end entity certificates in the database."""forname,certincerts.items():print(f'Saving EE certificate in DB: {name}')CertificateModel.save_certificate(cert)
@staticmethod
[docs]defcreate_csr(number:int)->None:"""Create a number of test Certificate Signing Requests."""tests_data_path=Path(__file__).parent.parent.parent.parent.parent/Path('tests/data/issuing_cas')foriinrange(number):private_key=rsa.generate_private_key(public_exponent=65537,key_size=2048,)builder=x509.CertificateSigningRequestBuilder()builder=builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME,f'CSR Cert {i}'),]))builder=builder.add_extension(x509.BasicConstraints(ca=False,path_length=None),critical=True,)csr=builder.sign(private_key,hashes.SHA256())withPath(tests_data_path/Path(f'csr{i}.pem')).open('wb')asf:f.write(csr.public_bytes(encoding=Encoding.PEM))