"""Module that contains the CertificateModel."""
from __future__ import annotations
import datetime
from types import MappingProxyType
from typing import Any
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from django.contrib import admin
from django.db import models, transaction
from django.utils.translation import gettext_lazy as _
from django_stubs_ext.db.models import TypedModelMeta
from trustpoint_core.oid import (
AlgorithmIdentifier,
CertificateExtensionOid,
NamedCurve,
NameOid,
PublicKeyAlgorithmOid,
PublicKeyInfo,
SignatureSuite,
)
from trustpoint_core.serializer import CertificateSerializer, PublicKeySerializer
from util.db import CustomDeleteActionModel
from pki.models.extension import (
AttributeTypeAndValue,
AuthorityInformationAccessExtension,
AuthorityKeyIdentifierExtension,
BasicConstraintsExtension,
CertificatePoliciesExtension,
CrlDistributionPointsExtension,
ExtendedKeyUsageExtension,
FreshestCrlExtension,
InhibitAnyPolicyExtension,
IssuerAlternativeNameExtension,
KeyUsageExtension,
NameConstraintsExtension,
PolicyConstraintsExtension,
SubjectAlternativeNameExtension,
SubjectDirectoryAttributesExtension,
SubjectInformationAccessExtension,
SubjectKeyIdentifierExtension,
)
from trustpoint.logger import LoggerMixin
__all__ = ['CertificateModel', 'RevokedCertificateModel']
[docs]
class CertificateModel(LoggerMixin, CustomDeleteActionModel):
"""X509 Certificate Model.
See RFC5280 for more information.
"""
[docs]
class CertificateStatus(models.TextChoices):
"""CertificateModel status."""
[docs]
REVOKED = 'REV', _('Revoked')
[docs]
EXPIRED = 'EXP', _('Expired')
[docs]
NOT_YET_VALID = 'NYV', _('Not Yet Valid')
# ------------------------------------------------- Django Choices -------------------------------------------------
[docs]
class Version(models.IntegerChoices):
"""X509 RFC 5280 - Certificate Version."""
# We only allow version 3 or later if any are available in the future.
# We are explicitly and statically defining the types we want to allow to be stored to the database.
# This has the nice effect, that it is also mypy compliant without any additions.
[docs]
class SignatureAlgorithmOidChoices(models.TextChoices):
"""Signature Algorithm OIDs."""
[docs]
RSA_MD5 = AlgorithmIdentifier.RSA_MD5.dotted_string
[docs]
RSA_SHA1 = AlgorithmIdentifier.RSA_SHA1.dotted_string
[docs]
RSA_SHA1_ALT = AlgorithmIdentifier.RSA_SHA1_ALT.dotted_string
[docs]
RSA_SHA224 = AlgorithmIdentifier.RSA_SHA224.dotted_string
[docs]
RSA_SHA256 = AlgorithmIdentifier.RSA_SHA256.dotted_string
[docs]
RSA_SHA384 = AlgorithmIdentifier.RSA_SHA384.dotted_string
[docs]
RSA_SHA512 = AlgorithmIdentifier.RSA_SHA512.dotted_string
[docs]
RSA_SHA3_224 = AlgorithmIdentifier.RSA_SHA3_224.dotted_string
[docs]
RSA_SHA3_256 = AlgorithmIdentifier.RSA_SHA3_256.dotted_string
[docs]
RSA_SHA3_384 = AlgorithmIdentifier.RSA_SHA3_384.dotted_string
[docs]
RSA_SHA3_512 = AlgorithmIdentifier.RSA_SHA3_512.dotted_string
[docs]
ECDSA_SHA1 = AlgorithmIdentifier.ECDSA_SHA1.dotted_string
[docs]
ECDSA_SHA224 = AlgorithmIdentifier.ECDSA_SHA224.dotted_string
[docs]
ECDSA_SHA256 = AlgorithmIdentifier.ECDSA_SHA256.dotted_string
[docs]
ECDSA_SHA384 = AlgorithmIdentifier.ECDSA_SHA384.dotted_string
[docs]
ECDSA_SHA512 = AlgorithmIdentifier.ECDSA_SHA512.dotted_string
[docs]
ECDSA_SHA3_224 = AlgorithmIdentifier.ECDSA_SHA3_224.dotted_string
[docs]
ECDSA_SHA3_256 = AlgorithmIdentifier.ECDSA_SHA3_256.dotted_string
[docs]
ECDSA_SHA3_384 = AlgorithmIdentifier.ECDSA_SHA3_384.dotted_string
[docs]
ECDSA_SHA3_512 = AlgorithmIdentifier.ECDSA_SHA3_512.dotted_string
[docs]
PASSWORD_BASED_MAC = AlgorithmIdentifier.PASSWORD_BASED_MAC.dotted_string
[docs]
class PublicKeyAlgorithmOidChoices(models.TextChoices):
"""Public Key Algorithm OIDs."""
[docs]
ECC = PublicKeyAlgorithmOid.ECC.dotted_string
[docs]
RSA = PublicKeyAlgorithmOid.RSA.dotted_string
[docs]
class PublicKeyEcCurveOidChoices(models.TextChoices):
"""Public Key EC Curve OIDs."""
[docs]
SECP192R1 = NamedCurve.SECP192R1.dotted_string
[docs]
SECP224R1 = NamedCurve.SECP224R1.dotted_string
[docs]
SECP256K1 = NamedCurve.SECP256K1.dotted_string
[docs]
SECP256R1 = NamedCurve.SECP256R1.dotted_string
[docs]
SECP384R1 = NamedCurve.SECP384R1.dotted_string
[docs]
SECP521R1 = NamedCurve.SECP521R1.dotted_string
[docs]
BRAINPOOLP256R1 = NamedCurve.BRAINPOOLP256R1.dotted_string
[docs]
BRAINPOOLP384R1 = NamedCurve.BRAINPOOLP384R1.dotted_string
[docs]
BRAINPOOLP512R1 = NamedCurve.BRAINPOOLP512R1.dotted_string
[docs]
SECT163K1 = NamedCurve.SECT163K1.dotted_string
[docs]
SECT163R2 = NamedCurve.SECT163R2.dotted_string
[docs]
SECT233K1 = NamedCurve.SECT233K1.dotted_string
[docs]
SECT233R1 = NamedCurve.SECT233R1.dotted_string
[docs]
SECT283K1 = NamedCurve.SECT283K1.dotted_string
[docs]
SECT283R1 = NamedCurve.SECT283R1.dotted_string
[docs]
SECT409K1 = NamedCurve.SECT409K1.dotted_string
[docs]
SECT409R1 = NamedCurve.SECT409R1.dotted_string
[docs]
SECT571K1 = NamedCurve.SECT571K1.dotted_string
[docs]
SECT571R1 = NamedCurve.SECT571R1.dotted_string
# ----------------------------------------------- Custom Data Fields -----------------------------------------------
[docs]
is_self_signed = models.BooleanField(verbose_name=_('Self-Signed'), null=False, blank=False)
# TODO(AlexHx8472): This is kind of a hack. # noqa: FIX002
# This information is already available through the subject relation
# Property would not be sortable.
# We may want to resolve this later by modifying the queryset within the view
[docs]
common_name = models.CharField(verbose_name=_('Common Name'), max_length=256, default='')
[docs]
sha256_fingerprint = models.CharField(
verbose_name=_('Fingerprint (SHA256)'), max_length=256, editable=False, unique=True
)
# ------------------------------------------ Certificate Fields (Header) -------------------------------------------
# OID of the signature algorithm -> dotted_string in DB
[docs]
signature_algorithm_oid = models.CharField(
_('Signature Algorithm OID'), max_length=256, editable=False, choices=SignatureAlgorithmOidChoices
)
# The DER encoded signature value as hex string. Without prefix, all uppercase, no whitespace / trimmed.
[docs]
signature_value = models.CharField(verbose_name=_('Signature Value'), max_length=65536, editable=False)
# ------------------------------------------ TBSCertificate Fields (Body) ------------------------------------------
# order of fields, attributes and choices follows RFC5280
# X.509 Certificate Version (RFC5280)
[docs]
version = models.PositiveSmallIntegerField(verbose_name=_('Version'), choices=Version, editable=False)
# X.509 Certificate Serial Number (RFC5280)
# This is not part of the subject. It is the serial number of the certificate itself.
[docs]
serial_number = models.CharField(verbose_name=_('Serial Number'), max_length=256, editable=False)
[docs]
issuer = models.ManyToManyField(
AttributeTypeAndValue, verbose_name=_('Issuer'), related_name='issuer', editable=False
)
# The DER encoded issuer as hex string. Without prefix, all uppercase, no whitespace / trimmed.
[docs]
issuer_public_bytes = models.CharField(verbose_name=_('Issuer Public Bytes'), max_length=2048, editable=False)
# The validity entries use datetime objects with UTC timezone.
[docs]
not_valid_before = models.DateTimeField(verbose_name=_('Not Valid Before (UTC)'), editable=False)
[docs]
not_valid_after = models.DateTimeField(verbose_name=_('Not Valid After (UTC)'), editable=False)
# Stored as a set of AttributeTypeAndValue objects directly.
# Hence, looses some information if for example multiple rdns structures were used.
# However, this suffices for our use-case.
# Do not use these to compare certificate subjects. Use issuer_public_bytes for this.
[docs]
subject = models.ManyToManyField(
AttributeTypeAndValue, verbose_name=_('Subject'), related_name='subject', editable=False
)
# The DER encoded subject as hex string. Without prefix, all uppercase, no whitespace / trimmed.
[docs]
subject_public_bytes = models.CharField(verbose_name=_('Subject Public Bytes'), max_length=2048, editable=False)
# Subject Public Key Info - Algorithm OID
[docs]
spki_algorithm_oid = models.CharField(
_('Public Key Algorithm OID'), max_length=256, editable=False, choices=PublicKeyAlgorithmOidChoices
)
# Subject Public Key Info - Algorithm Name
[docs]
spki_algorithm = models.CharField(verbose_name=_('Public Key Algorithm'), max_length=256, editable=False)
# Subject Public Key Info - Key Size
[docs]
spki_key_size = models.PositiveIntegerField(_('Public Key Size'), editable=False)
# Subject Public Key Info - Curve OID if ECC, None otherwise
[docs]
spki_ec_curve_oid = models.CharField(
verbose_name=_('Public Key Curve OID (ECC)'),
max_length=256,
editable=False,
choices=PublicKeyEcCurveOidChoices,
default=PublicKeyEcCurveOidChoices.NONE,
)
# Subject Public Key Info - Curve Name if ECC, None otherwise
[docs]
spki_ec_curve = models.CharField(
verbose_name=_('Public Key Curve (ECC)'), max_length=256, editable=False, default=None
)
# ---------------------------------------------------- Raw Data ----------------------------------------------------
[docs]
cert_pem = models.TextField(verbose_name=_('Certificate (PEM)'), editable=False)
[docs]
public_key_pem = models.CharField(verbose_name=_('Public Key (PEM, SPKI)'), max_length=65536, editable=False)
# ----------------------------------------- CertificateModel Creation Data -----------------------------------------
[docs]
created_at = models.DateTimeField(verbose_name=_('Created-At'), auto_now_add=True)
# --------------------------------------------------- Extensions ---------------------------------------------------
# order of extensions follows RFC5280
[docs]
key_usage_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.KEY_USAGE.verbose_name,
to=KeyUsageExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
subject_alternative_name_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.SUBJECT_ALTERNATIVE_NAME.verbose_name,
to=SubjectAlternativeNameExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
issuer_alternative_name_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.ISSUER_ALTERNATIVE_NAME.verbose_name,
to=IssuerAlternativeNameExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
basic_constraints_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.BASIC_CONSTRAINTS.verbose_name,
to=BasicConstraintsExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
authority_key_identifier_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.AUTHORITY_KEY_IDENTIFIER.verbose_name,
to=AuthorityKeyIdentifierExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
subject_key_identifier_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.SUBJECT_KEY_IDENTIFIER.verbose_name,
to=SubjectKeyIdentifierExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
certificate_policies_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.CERTIFICATE_POLICIES.verbose_name,
to=CertificatePoliciesExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
extended_key_usage_extension = models.ForeignKey(
verbose_name=CertificateExtensionOid.EXTENDED_KEY_USAGE.verbose_name,
to=ExtendedKeyUsageExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
name_constraints_extension = models.ForeignKey(
NameConstraintsExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
crl_distribution_points_extension = models.ForeignKey(
CrlDistributionPointsExtension,
related_name='certificates',
editable=False,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
inhibit_any_policy_extension = models.ForeignKey(
InhibitAnyPolicyExtension,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
policy_constraints_extension = models.ForeignKey(
PolicyConstraintsExtension,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
subject_directory_attributes_extension = models.ForeignKey(
SubjectDirectoryAttributesExtension,
null=True,
blank=True,
on_delete=models.PROTECT,
)
[docs]
freshest_crl_extension = models.ForeignKey(
FreshestCrlExtension,
null=True,
blank=True,
on_delete=models.PROTECT,
)
# ------------------------------------------ Magic and default methods -------------------------------------------
[docs]
def __repr__(self) -> str:
"""Representation of the CertificateModel instance."""
return f'Certificate(CN={self.common_name})'
[docs]
def __str__(self) -> str:
"""Human-readable representation of the CertificateModel instance."""
return self.common_name
[docs]
def save(self, *_args: Any, **_kwargs: Any) -> None:
"""Save method must not be called directly to protect the integrity.
This method makes sure save() is not called by mistake.
Raises:
NotImplementedError
"""
exc_msg = (
'.save() must not be called directly on a Certificate instance to protect the integrity of the database. '
'Use .save_certificate() or .save_certificate_and_key() passing the required cryptography objects.'
)
raise NotImplementedError(exc_msg)
# --------------------------------------------------- Properties ---------------------------------------------------
@property
@admin.display(description=_('Signature Algorithm'))
[docs]
def signature_algorithm(self) -> str:
"""Name of the signature algorithm."""
return AlgorithmIdentifier.from_dotted_string(self.signature_algorithm_oid).verbose_name
@property
@admin.display(description=_('Signature Padding Scheme'))
[docs]
def signature_algorithm_padding_scheme(self) -> str:
"""Padding scheme if RSA is used, otherwise None."""
padding_scheme = AlgorithmIdentifier.from_dotted_string(self.signature_algorithm_oid).padding_scheme
if padding_scheme is None:
return ''
return padding_scheme.name
@property
[docs]
def signature_suite(self) -> SignatureSuite:
"""Signature Suite of the certificate."""
return SignatureSuite.from_certificate(self.get_certificate_serializer().as_crypto())
@property
[docs]
def public_key_info(self) -> PublicKeyInfo:
"""Public Key Info of the certificate."""
return self.signature_suite.public_key_info
@property
[docs]
def certificate_status(self) -> CertificateStatus:
"""Status of the certificate."""
if RevokedCertificateModel.objects.filter(certificate=self).exists():
return self.CertificateStatus.REVOKED
if datetime.datetime.now(datetime.UTC) < self.not_valid_before:
return self.CertificateStatus.NOT_YET_VALID
if datetime.datetime.now(datetime.UTC) > self.not_valid_after:
return self.CertificateStatus.EXPIRED
return self.CertificateStatus.OK
@property
[docs]
def is_ca(self) -> bool:
"""Check if the certificate is a CA certificate."""
return self.basic_constraints_extension is not None and self.basic_constraints_extension.ca
@property
[docs]
def is_root_ca(self) -> bool:
"""Check if the certificate is a root CA certificate."""
return self.is_self_signed and self.is_ca
@property
[docs]
def is_end_entity(self) -> bool:
"""Check if the certificate is an end entity certificate."""
return not self.is_ca
@classmethod
[docs]
def get_cert_by_sha256_fingerprint(cls, sha256_fingerprint: str) -> None | CertificateModel:
"""Get a CertificateModel instance by its SHA256 fingerprint."""
sha256_fingerprint = sha256_fingerprint.upper()
return cls.objects.filter(sha256_fingerprint=sha256_fingerprint).first()
@staticmethod
[docs]
def _get_subject(cert: x509.Certificate) -> list[tuple[str, str]]:
subject: list[tuple[str, str]] = []
for rdn in cert.subject.rdns:
for attr_type_and_value in rdn:
if isinstance(attr_type_and_value.value, bytes):
value = attr_type_and_value.value.hex()
else:
value = attr_type_and_value.value
subject.append((attr_type_and_value.oid.dotted_string, value))
return subject
@staticmethod
[docs]
def _get_issuer(cert: x509.Certificate) -> list[tuple[str, str]]:
issuer: list[tuple[str, str]] = []
for rdn in cert.issuer.rdns:
for attr_type_and_value in rdn:
if isinstance(attr_type_and_value.value, bytes):
value = attr_type_and_value.value.hex()
else:
value = attr_type_and_value.value
issuer.append((attr_type_and_value.oid.dotted_string, value))
return issuer
@staticmethod
[docs]
def _get_spki_info(cert: x509.Certificate) -> tuple[PublicKeyAlgorithmOid, int, NamedCurve]:
cert_public_key = cert.public_key()
if isinstance(cert_public_key, rsa.RSAPublicKey):
spki_algorithm_oid = PublicKeyAlgorithmOid.RSA
spki_ec_curve_oid = NamedCurve.NONE
elif isinstance(cert_public_key, ec.EllipticCurvePublicKey):
spki_algorithm_oid = PublicKeyAlgorithmOid.ECC
spki_ec_curve_oid = NamedCurve[cert_public_key.curve.name.upper()]
else:
exc_msg = 'Subject Public Key Info contains an unsupported key type.'
raise TypeError(exc_msg)
return spki_algorithm_oid, cert_public_key.key_size, spki_ec_curve_oid
# --------------------------------------------- Data Retrieval Methods ---------------------------------------------
[docs]
def get_certificate_serializer(self) -> CertificateSerializer:
"""Get the serializer for the certificate."""
return CertificateSerializer.from_pem(self.cert_pem.encode())
[docs]
def get_public_key_serializer(self) -> PublicKeySerializer:
"""Get the serializer for the certificate's public key."""
return PublicKeySerializer.from_pem(self.public_key_pem.encode())
# ---------------------------------------------- Private save methods ----------------------------------------------
[docs]
def _save(self, **kwargs: Any) -> None:
return super().save(**kwargs)
@classmethod
[docs]
def _save_certificate(cls, certificate: x509.Certificate | CertificateSerializer) -> CertificateModel:
if isinstance(certificate, CertificateSerializer):
certificate = certificate.as_crypto()
# ------------------------------------------------ Exist Checks ------------------------------------------------
certificate_in_db = cls.get_cert_by_sha256_fingerprint(certificate.fingerprint(algorithm=hashes.SHA256()).hex())
if certificate_in_db:
return certificate_in_db
# --------------------------------------------- Custom Data Fields ---------------------------------------------
sha256_fingerprint = certificate.fingerprint(algorithm=hashes.SHA256()).hex().upper()
# ---------------------------------------- Certificate Fields (Header) -----------------------------------------
signature_algorithm_oid = certificate.signature_algorithm_oid.dotted_string
signature_value = certificate.signature.hex().upper()
# ---------------------------------------- TBSCertificate Fields (Body) ----------------------------------------
version = certificate.version.value
serial_number = hex(certificate.serial_number)[2:].upper()
issuer = cls._get_issuer(certificate)
issuer_public_bytes = certificate.issuer.public_bytes().hex().upper()
not_valid_before = certificate.not_valid_before_utc
not_valid_after = certificate.not_valid_after_utc
subject = cls._get_subject(certificate)
subject_public_bytes = certificate.subject.public_bytes().hex().upper()
spki_algorithm_oid, spki_key_size, spki_ec_curve_oid = cls._get_spki_info(certificate)
try:
certificate.verify_directly_issued_by(certificate)
is_self_signed = True
except (ValueError, TypeError, InvalidSignature):
is_self_signed = False
# -------------------------------------------------- Raw Data --------------------------------------------------
cert_pem = certificate.public_bytes(encoding=serialization.Encoding.PEM).decode()
public_key_pem = (
certificate.public_key()
.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)
.decode()
)
# ----------------------------------------- Certificate Model Instance -----------------------------------------
cert_model = CertificateModel(
sha256_fingerprint=sha256_fingerprint,
signature_algorithm_oid=signature_algorithm_oid,
signature_value=signature_value,
version=version,
serial_number=serial_number,
issuer_public_bytes=issuer_public_bytes,
not_valid_before=not_valid_before,
not_valid_after=not_valid_after,
subject_public_bytes=subject_public_bytes,
spki_algorithm_oid=spki_algorithm_oid.dotted_string,
spki_algorithm=spki_algorithm_oid.name,
spki_key_size=spki_key_size,
spki_ec_curve_oid=spki_ec_curve_oid.dotted_string,
spki_ec_curve=spki_ec_curve_oid.verbose_name,
cert_pem=cert_pem,
public_key_pem=public_key_pem,
is_self_signed=is_self_signed,
)
# --------------------------------------------- Store in DataBase ----------------------------------------------
return cls._atomic_save(cert_model=cert_model, certificate=certificate, subject=subject, issuer=issuer)
@staticmethod
[docs]
def _save_attribute_and_value_pairs(oid: str, value: str) -> AttributeTypeAndValue:
existing_attr_type_and_val = AttributeTypeAndValue.objects.filter(oid=oid, value=value).first()
if existing_attr_type_and_val:
return existing_attr_type_and_val
attr_type_and_val = AttributeTypeAndValue(oid=oid, value=value)
attr_type_and_val.save()
return attr_type_and_val
@classmethod
[docs]
def _save_subject(cls, cert_model: CertificateModel, subject: list[tuple[str, str]]) -> None:
for entry in subject:
oid, value = entry
attr_type_and_val = cls._save_attribute_and_value_pairs(oid=oid, value=value)
cert_model.subject.add(attr_type_and_val)
@classmethod
[docs]
def _save_issuer(cls, cert_model: CertificateModel, issuer: list[tuple[str, str]]) -> None:
for entry in issuer:
oid, value = entry
attr_type_and_val = cls._save_attribute_and_value_pairs(oid=oid, value=value)
cert_model.issuer.add(attr_type_and_val)
[docs]
EXTENSION_MAP = MappingProxyType(
{
x509.BasicConstraints: ('basic_constraints_extension', BasicConstraintsExtension),
x509.KeyUsage: ('key_usage_extension', KeyUsageExtension),
x509.IssuerAlternativeName: ('issuer_alternative_name_extension', IssuerAlternativeNameExtension),
x509.SubjectAlternativeName: ('subject_alternative_name_extension', SubjectAlternativeNameExtension),
x509.AuthorityKeyIdentifier: ('authority_key_identifier_extension', AuthorityKeyIdentifierExtension),
x509.SubjectKeyIdentifier: ('subject_key_identifier_extension', SubjectKeyIdentifierExtension),
x509.CertificatePolicies: ('certificate_policies_extension', CertificatePoliciesExtension),
x509.ExtendedKeyUsage: ('extended_key_usage_extension', ExtendedKeyUsageExtension),
x509.NameConstraints: ('name_constraints_extension', NameConstraintsExtension),
x509.CRLDistributionPoints: ('crl_distribution_points_extension', CrlDistributionPointsExtension),
x509.AuthorityInformationAccess: (
'authority_information_access_extension',
AuthorityInformationAccessExtension,
),
x509.SubjectInformationAccess: ('subject_information_access_extension', SubjectInformationAccessExtension),
x509.InhibitAnyPolicy: ('inhibit_any_policy_extension', InhibitAnyPolicyExtension),
x509.PolicyConstraints: ('policy_constraints_extension', PolicyConstraintsExtension),
x509.FreshestCRL: ('freshest_crl_extension', FreshestCrlExtension),
}
)
@staticmethod
[docs]
def _save_extensions(cert_model: CertificateModel, cert: x509.Certificate) -> None:
for extension in cert.extensions:
for x509_extension_type, (field_name, extension_model_class) in CertificateModel.EXTENSION_MAP.items():
if isinstance(extension.value, x509_extension_type):
extension_model = extension_model_class.save_from_crypto_extensions(extension)
setattr(cert_model, field_name, extension_model)
break
@classmethod
@transaction.atomic
[docs]
def _atomic_save(
cls,
cert_model: CertificateModel,
certificate: x509.Certificate,
subject: list[tuple[str, str]],
issuer: list[tuple[str, str]],
) -> CertificateModel:
cert_model._save()
for oid, value in subject:
if oid == NameOid.COMMON_NAME.dotted_string:
cert_model.common_name = value
cls._save_subject(cert_model, subject)
cls._save_issuer(cert_model, issuer)
cls._save_extensions(cert_model, certificate)
cert_model._save()
return cert_model
# ---------------------------------------------- Public save methods -----------------------------------------------
@classmethod
[docs]
def save_certificate(cls, certificate: x509.Certificate | CertificateSerializer) -> CertificateModel:
"""Store the certificate in the database.
Returns:
trustpoint.pki.models.Certificate: The certificate object that has just been saved.
"""
return cls._save_certificate(certificate=certificate)
# ---------------------------------------------- Post-deletion cleanup ---------------------------------------------
[docs]
def pre_delete(self) -> None:
"""Store the related objects before deletion."""
self._related_objects = {
'basic_constraints_extension': self.basic_constraints_extension,
'key_usage_extension': self.key_usage_extension,
'issuer_alternative_name_extension': self.issuer_alternative_name_extension,
'subject_alternative_name_extension': self.subject_alternative_name_extension,
'authority_key_identifier_extension': self.authority_key_identifier_extension,
'subject_key_identifier_extension': self.subject_key_identifier_extension,
'certificate_policies_extension': self.certificate_policies_extension,
'extended_key_usage_extension': self.extended_key_usage_extension,
'name_constraints_extension': self.name_constraints_extension,
'crl_distribution_points_extension': self.crl_distribution_points_extension,
'authority_information_access_extension': self.authority_information_access_extension,
'subject_information_access_extension': self.subject_information_access_extension,
'inhibit_any_policy_extension': self.inhibit_any_policy_extension,
'policy_constraints_extension': self.policy_constraints_extension,
'freshest_crl_extension': self.freshest_crl_extension,
}
[docs]
def post_delete(self) -> None:
"""Clean up related orphaned extension models."""
BasicConstraintsExtension.delete_if_orphaned(self._related_objects['basic_constraints_extension'])
KeyUsageExtension.delete_if_orphaned(self._related_objects['key_usage_extension'])
IssuerAlternativeNameExtension.delete_if_orphaned(self._related_objects['issuer_alternative_name_extension'])
SubjectAlternativeNameExtension.delete_if_orphaned(self._related_objects['subject_alternative_name_extension'])
AuthorityKeyIdentifierExtension.delete_if_orphaned(self._related_objects['authority_key_identifier_extension'])
SubjectKeyIdentifierExtension.delete_if_orphaned(self._related_objects['subject_key_identifier_extension'])
CertificatePoliciesExtension.delete_if_orphaned(self._related_objects['certificate_policies_extension'])
ExtendedKeyUsageExtension.delete_if_orphaned(self._related_objects['extended_key_usage_extension'])
NameConstraintsExtension.delete_if_orphaned(self._related_objects['name_constraints_extension'])
CrlDistributionPointsExtension.delete_if_orphaned(self._related_objects['crl_distribution_points_extension'])
AuthorityInformationAccessExtension.delete_if_orphaned(
self._related_objects['authority_information_access_extension']
)
SubjectInformationAccessExtension.delete_if_orphaned(
self._related_objects['subject_information_access_extension']
)
InhibitAnyPolicyExtension.delete_if_orphaned(self._related_objects['inhibit_any_policy_extension'])
PolicyConstraintsExtension.delete_if_orphaned(self._related_objects['policy_constraints_extension'])
FreshestCrlExtension.delete_if_orphaned(self._related_objects['freshest_crl_extension'])
# ---------------------------------------------- Utility ---------------------------------------------
[docs]
def subjects_match(self, other_subject: x509.Name) -> bool:
"""Check if the provided subject is identical to the one of this certificate.
Args:
other_subject (x509.Name): The subject to compare to.
Returns:
bool: True if the subjects match, False otherwise.
"""
return self.subject_public_bytes == other_subject.public_bytes().hex().upper()
[docs]
class RevokedCertificateModel(models.Model):
"""Model to store revoked certificates."""
[docs]
class ReasonCode(models.TextChoices):
"""Revocation reasons per RFC 5280."""
[docs]
UNSPECIFIED = 'unspecified', _('Unspecified')
[docs]
KEY_COMPROMISE = 'keyCompromise', _('Key Compromise')
[docs]
CA_COMPROMISE = 'cACompromise', _('CA Compromise')
[docs]
AFFILIATION_CHANGED = 'affiliationChanged', _('Affiliation Changed')
[docs]
SUPERSEDED = 'superseded', _('Superseded')
[docs]
CESSATION = 'cessationOfOperation', _('Cessation of Operation')
[docs]
CERTIFICATE_HOLD = 'certificateHold', _('Certificate Hold')
[docs]
PRIVILEGE_WITHDRAWN = 'privilegeWithdrawn', _('Privilege Withdrawn')
[docs]
AA_COMPROMISE = 'aACompromise', _('AA Compromise')
[docs]
REMOVE_FROM_CRL = 'removeFromCRL', _('Remove from CRL')
[docs]
certificate = models.OneToOneField(
CertificateModel, verbose_name=_('Certificate'), related_name='revoked_certificate', on_delete=models.CASCADE
)
[docs]
revoked_at = models.DateTimeField(verbose_name=_('Revocation Date'), auto_now_add=True)
[docs]
revocation_reason = models.TextField(
verbose_name=_('Revocation Reason'), choices=ReasonCode, default=ReasonCode.UNSPECIFIED
)
[docs]
ca = models.ForeignKey(
'IssuingCaModel',
verbose_name=_('Issuing CA'),
related_name='revoked_certificates',
on_delete=models.SET_NULL, # Safe to remove CRL if CA is removed?
null=True,
)
[docs]
def __str__(self) -> str:
"""String representation of the RevokedCertificateModel instance."""
return f'RevokedCertificate({self.certificate.common_name})'