Source code for commands.create_and_validate_test_certs

"""Management command to create some certificates for testing and verifying them using OpenSSL."""

# ruff: noqa: T201  # print is fine in management commands

from __future__ import annotations

import datetime
import ipaddress
import shutil
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import BestAvailableEncryption, pkcs12
from cryptography.x509.oid import NameOID
from django.core.management.base import BaseCommand
from trustpoint_core.oid import KeyPairGenerator, NamedCurve, PublicKeyAlgorithmOid, PublicKeyInfo

from .base_commands import CertificateCreationCommandMixin

if TYPE_CHECKING:
    from cryptography.hazmat.primitives.asymmetric import ec, rsa


[docs] class Command(CertificateCreationCommandMixin, BaseCommand): """Creates a certificate chain and validates it uing OpenSSL."""
[docs] help = 'Creates a certificate chain and validates it uing OpenSSL.'
[docs] def _create_cert_chain(self, algorithm: PublicKeyInfo, path: Path) -> None: pem_root_cert, root_cert, root_private_key = self._create_certificate( common_name=f'{algorithm}-root-ca', algorithm=algorithm, path=path ) pem_issuing_cert, issuing_cert, issuing_private = self._create_certificate( common_name=f'{algorithm}-issuing-ca', algorithm=algorithm, path=path, issuer=root_cert, issuer_priv_key=root_private_key, ) pem_ee_cert, ee_cert, ee_key = self._create_certificate( common_name=f'{algorithm}-ee', algorithm=algorithm, path=path, issuer=issuing_cert, issuer_priv_key=issuing_private, ) p12 = pkcs12.serialize_key_and_certificates( name=b'', key=ee_key, cert=ee_cert, cas=[root_cert, issuing_cert], encryption_algorithm=BestAvailableEncryption(b'password'), ) with Path(path / f'{algorithm}.p12').open('wb') as f: f.write(p12) with Path(path / f'{algorithm}-chain.pem').open('wb') as f: cert_chain = pem_root_cert + pem_issuing_cert + pem_ee_cert f.write(cert_chain.encode())
@classmethod
[docs] def _create_certificate( # noqa: PLR0913 cls, common_name: str, algorithm: PublicKeyInfo, path: Path, issuer: None | x509.Certificate = None, issuer_priv_key: None | rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey = None, validity_days: int = 365, ) -> tuple[str, x509.Certificate, rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey]: private_key = KeyPairGenerator.generate_key_pair_for_public_key_info(algorithm) one_day = datetime.timedelta(1, 0, 0) public_key = private_key.public_key() builder = x509.CertificateBuilder() builder = builder.subject_name( x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, common_name), ] ) ) if issuer is None: builder = builder.issuer_name( x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, common_name), ] ) ) else: builder = builder.issuer_name(issuer.subject) builder = builder.not_valid_before(datetime.datetime.now(tz=datetime.UTC) - one_day) builder = builder.not_valid_after(datetime.datetime.now(tz=datetime.UTC) + (one_day * validity_days)) builder = builder.serial_number(x509.random_serial_number()) builder = builder.public_key(public_key) # ------------------------------------------------- Extensions ------------------------------------------------- if issuer is None: ca = True path_length = 1 elif issuer.extensions.get_extension_for_class(x509.BasicConstraints).value.path_length == 1: ca = True path_length = 0 else: ca = False path_length = None builder = builder.add_extension(x509.BasicConstraints(ca=ca, path_length=path_length), critical=True) builder = builder.add_extension( x509.KeyUsage( digital_signature=True, content_commitment=True, key_encipherment=True, data_encipherment=True, key_agreement=True, key_cert_sign=True, crl_sign=True, encipher_only=True, decipher_only=True, ), critical=True, ) some_arbitrary_der_as_hex = ( '30413130302E06035504030C275472757374506F696E7420526F6F74204341202D2' '0534543503235365231202D20534841323536310D300B060355040513044E6F6E65' ) some_arbitrary_der = bytes.fromhex(some_arbitrary_der_as_hex) builder = builder.add_extension( x509.IssuerAlternativeName( [ x509.RFC822Name('trustpoint@trustpoint.de'), x509.DNSName('trustpoint.de'), x509.UniformResourceIdentifier('https://trustpoint.de'), x509.IPAddress(ipaddress.IPv4Address('127.0.0.1')), x509.IPAddress(ipaddress.IPv6Address('2001:0db8:85a3:0000:0000:8a2e:0370:7334')), x509.IPAddress(ipaddress.IPv4Network('192.168.127.12/24', strict=False)), x509.IPAddress(ipaddress.IPv6Network('2001:db8:1234::/48')), x509.RegisteredID(x509.ObjectIdentifier('2.5.4.3')), x509.OtherName(type_id=x509.ObjectIdentifier('2.5.4.3'), value=some_arbitrary_der), x509.DirectoryName( x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, 'Trustpoint Model Test'), x509.NameAttribute(NameOID.ORGANIZATION_NAME, 'Trustpoint'), ] ) ), ] ), critical=False, ) builder = builder.add_extension( x509.SubjectAlternativeName( [ x509.RFC822Name('subject@trustpoint.de'), x509.DNSName('subject.trustpoint.de'), x509.UniformResourceIdentifier('https://subject.trustpoint.de'), x509.IPAddress(ipaddress.IPv4Address('127.0.0.1')), x509.IPAddress(ipaddress.IPv6Address('2001:0db8:85a3:0000:0000:8a2e:0370:7334')), x509.IPAddress(ipaddress.IPv4Network('192.168.127.12/24', strict=False)), x509.IPAddress(ipaddress.IPv6Network('2001:db8:1234::/48')), x509.RegisteredID(x509.ObjectIdentifier('2.5.4.3')), x509.OtherName(type_id=x509.ObjectIdentifier('2.5.4.3'), value=some_arbitrary_der), x509.DirectoryName( x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, 'Subject Trustpoint Model Test'), x509.NameAttribute(NameOID.ORGANIZATION_NAME, 'Subject Trustpoint'), ] ) ), ] ), critical=False, ) if issuer_priv_key is None: certificate = builder.sign( private_key=private_key, algorithm=hashes.SHA256(), ) else: certificate = builder.sign( private_key=issuer_priv_key, algorithm=hashes.SHA256(), ) pem_priv_key = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) pem_cert = certificate.public_bytes(serialization.Encoding.PEM) with Path(path / f'{common_name}-key.pem').open('wb') as f: f.write(pem_priv_key) with Path(path / f'{common_name}-cert.pem').open('wb') as f: f.write(pem_cert) return pem_cert.decode(), certificate, private_key
@staticmethod
[docs] def _create_trust_store(algorithms: list[PublicKeyInfo], path: Path) -> None: certs = '' for value in [str(algo) for algo in algorithms]: with Path(path / f'{value}-chain.pem').open() as f: certs += f.read() with Path(path / 'trust-store.pem').open('w') as f: f.write(certs)
[docs] def handle(self, *_args: tuple[str], **_kwargs: dict[str, str]) -> None: """Executes the command.""" tests_data_path = Path(__file__).parent.parent.parent.parent.parent / Path('tests/data/certs') shutil.rmtree(tests_data_path, ignore_errors=True) tests_data_path.mkdir(exist_ok=True) public_key_algorithms = [ PublicKeyInfo(public_key_algorithm_oid=PublicKeyAlgorithmOid.RSA, key_size=4096), PublicKeyInfo(public_key_algorithm_oid=PublicKeyAlgorithmOid.ECC, named_curve=NamedCurve.SECP256R1), ] for algo in public_key_algorithms: self._create_cert_chain(algorithm=algo, path=tests_data_path) self._create_trust_store(algorithms=public_key_algorithms, path=tests_data_path) for algo in public_key_algorithms: cmd = ( 'openssl', 'verify', '-CAfile', f'{tests_data_path}/{algo}-root-ca-cert.pem', '-untrusted', f'{tests_data_path}/{algo}-issuing-ca-cert.pem', f'{tests_data_path}/{algo}-ee-cert.pem', ) print(f'Created certificate chain with {algo} and SHA256.') print(f'Verifying certificate chain with {algo} and SHA256.') print(subprocess.check_output(cmd).decode()) # noqa: S603