Source code for pki.models.issuing_ca

"""Module that contains the IssuingCaModel."""

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING

from cryptography import x509
from cryptography.hazmat.primitives import serialization
from django.core.exceptions import ValidationError
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from trustpoint_core import oid
from util.db import CustomDeleteActionModel
from util.field import UniqueNameValidator

from pki.models.certificate import CertificateModel, RevokedCertificateModel
from pki.models.credential import CredentialModel
from cryptography.hazmat.primitives import hashes
from trustpoint.logger import LoggerMixin

if TYPE_CHECKING:
    from django.db.models.query import QuerySet
    from trustpoint_core.serializer import CredentialSerializer
    from util.db import CustomDeleteActionManager


[docs] class IssuingCaModel(LoggerMixin, CustomDeleteActionModel): """Issuing CA Model. This model contains the configurations of all Issuing CAs available within the Trustpoint. """
[docs] class IssuingCaTypeChoice(models.IntegerChoices): """The IssuingCaTypeChoice defines the type of Issuing CA. Depending on the type other fields may be set, e.g. a credential will only be available for local Issuing CAs. """
[docs] AUTOGEN_ROOT = 0, _('Auto-Generated Root')
[docs] AUTOGEN = 1, _('Auto-Generated')
[docs] LOCAL_UNPROTECTED = 2, _('Local-Unprotected')
[docs] LOCAL_PKCS11 = 3, _('Local-PKCS11')
[docs] REMOTE_EST = 4, _('Remote-EST')
[docs] REMOTE_CMP = 5, _('Remote-CMP')
[docs] unique_name = models.CharField( verbose_name=_('Issuing CA Name'), max_length=100, validators=[UniqueNameValidator()], unique=True )
[docs] credential: CredentialModel = models.OneToOneField(CredentialModel, related_name='issuing_cas', on_delete=models.PROTECT)
[docs] issuing_ca_type = models.IntegerField( verbose_name=_('Issuing CA Type'), choices=IssuingCaTypeChoice, null=False, blank=False )
[docs] is_active = models.BooleanField( _('Active'), default=True, )
[docs] created_at = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
[docs] updated_at = models.DateTimeField(verbose_name=_('Updated'), auto_now=True)
[docs] last_crl_issued_at = models.DateTimeField(verbose_name=_('Last CRL Issued'), null=True, blank=True)
[docs] crl_pem = models.TextField(editable=False, default='', verbose_name=_('CRL in PEM format'))
[docs] def __str__(self) -> str: """Returns a human-readable string that represents this IssuingCaModel entry. Returns: str: Human-readable string that represents this IssuingCaModel entry. """ return self.unique_name
[docs] def __repr__(self) -> str: """Returns a string representation of the IssuingCaModel instance.""" return f'IssuingCaModel(unique_name={self.unique_name})'
@property
[docs] def common_name(self) -> str: """Returns common name.""" return self.credential.certificate.common_name
@classmethod
[docs] def create_new_issuing_ca( cls, unique_name: str, credential_serializer: CredentialSerializer, issuing_ca_type: IssuingCaModel.IssuingCaTypeChoice, ) -> IssuingCaModel: """Creates a new Issuing CA model and returns it. Args: unique_name: The unique name that will be used to identify the Issuing CA. credential_serializer: The credential as CredentialSerializer instance. It will be normalized and validated, if it is a valid credential to be used as an Issuing CA. issuing_ca_type: The Issuing CA type. Returns: IssuingCaModel: The newly created Issuing CA model. """ ca_cert = credential_serializer.certificate if not ca_cert: raise ValidationError(_('The provided credential is not a valid CA; it does not contain a certificate.')) try: bc_extension = ca_cert.extensions.get_extension_for_class(x509.BasicConstraints) except x509.ExtensionNotFound: raise ValidationError(_('The provided certificate is not a valid CA certificate; it does not contain a Basic Constraints extension.')) if not bc_extension.value.ca: raise ValidationError(_('The provided certificate is not a valid CA certificate; it is an End Entity certificate.')) issuing_ca_types = ( cls.IssuingCaTypeChoice.AUTOGEN_ROOT, cls.IssuingCaTypeChoice.AUTOGEN, cls.IssuingCaTypeChoice.LOCAL_UNPROTECTED, cls.IssuingCaTypeChoice.LOCAL_PKCS11, ) if issuing_ca_type in issuing_ca_types: credential_type = CredentialModel.CredentialTypeChoice.ISSUING_CA else: exc_msg = f'Issuing CA Type {issuing_ca_type} is not yet supported.' raise ValueError(exc_msg) credential_model = CredentialModel.save_credential_serializer( credential_serializer=credential_serializer, credential_type=credential_type ) issuing_ca = cls( unique_name=unique_name, credential=credential_model, issuing_ca_type=issuing_ca_type, ) issuing_ca.save() return issuing_ca
[docs] def issue_crl(self) -> bool: """Issues a CRL with revoked certificates issued by this CA.""" self.logger.debug('Generating CRL for CA %s', self.unique_name) try: crl_issued_at = timezone.now() self.last_crl_issued_at = crl_issued_at ca_subject = self.credential.certificate.get_certificate_serializer().as_crypto().subject crl_builder = x509.CertificateRevocationListBuilder( issuer_name=ca_subject, last_update=crl_issued_at, next_update=crl_issued_at + datetime.timedelta(hours=24), # (minutes=self.next_crl_generation_time) ) crl_certificates = self.revoked_certificates.all() for cert in crl_certificates: revoked_cert = ( x509.RevokedCertificateBuilder() .serial_number(int(cert.certificate.serial_number, 16)) .revocation_date(cert.revoked_at) .add_extension(x509.CRLReason(x509.ReasonFlags(cert.revocation_reason)), critical=False) .build() ) crl_builder = crl_builder.add_revoked_certificate(revoked_cert) hash_algorithm = self.credential.hash_algorithm if (hash_algorithm is not None and not isinstance(hash_algorithm, ( hashes.SHA224, hashes.SHA256, hashes.SHA384, hashes.SHA512, hashes.SHA3_224, hashes.SHA3_256, hashes.SHA3_384, hashes.SHA3_512 ))): err_msg = 'Cannot build the domain credential, unknown hash algorithm found.' raise ValueError(err_msg) priv_k = self.credential.get_private_key_serializer().as_crypto() crl = crl_builder.sign(private_key=priv_k, algorithm=hash_algorithm) self.crl_pem = crl.public_bytes(encoding=serialization.Encoding.PEM).decode() self.save() self.logger.info('CRL generation for CA %s finished.', self.unique_name) except Exception: self.logger.exception('CRL generation for CA %s failed', self.unique_name) return False return True
@property
[docs] def signature_suite(self) -> oid.SignatureSuite: """The signature suite for the CA public key certificate.""" return oid.SignatureSuite.from_certificate(self.credential.get_certificate_serializer().as_crypto())
@property
[docs] def public_key_info(self) -> oid.PublicKeyInfo: """The public key info for the CA certificate's public key.""" return self.signature_suite.public_key_info
[docs] def get_issued_certificates(self) -> QuerySet[CertificateModel, CertificateModel]: """Returns certificates issued by this CA, except its own in case of a self-signed CA. This goes through all active certificates and checks issuance by this CA based on cert.issuer_public_bytes == ca.subject_public_bytes WARNING: This means that it may inadvertently return certificates that were issued by a different CA with the same subject name """ ca_subject_public_bytes = self.credential.certificate.subject_public_bytes # do not return self-signed CA certificate return CertificateModel.objects.filter(issuer_public_bytes=ca_subject_public_bytes).exclude( subject_public_bytes=ca_subject_public_bytes )
[docs] def revoke_all_issued_certificates(self, reason: str = RevokedCertificateModel.ReasonCode.UNSPECIFIED) -> None: """Revokes all certificates issued by this CA.""" qs = self.get_issued_certificates() for cert in qs: if (cert.certificate_status not in [CertificateModel.CertificateStatus.OK, CertificateModel.CertificateStatus.NOT_YET_VALID]): continue RevokedCertificateModel.objects.create(certificate=cert, revocation_reason=reason, ca=self) self.logger.info('All %i certificates issued by CA %s have been revoked.', qs.count(), self.unique_name) self.issue_crl()
[docs] def pre_delete(self) -> None: """Check for unexpired certificates issued by this CA before deleting it.""" self.logger.info('Deleting Issuing CA %s', self) qs = self.get_issued_certificates() for cert in qs: if cert.certificate_status != CertificateModel.CertificateStatus.EXPIRED: exc_msg = f'Cannot delete the Issuing CA {self} because it has issued unexpired certificate {cert}.' raise ValidationError(exc_msg)
[docs] def post_delete(self) -> None: """Deletes the credential of this CA after deleting it.""" self.logger.debug('Deleting credential of Issuing CA %s', self) self.credential.delete()