"""This modules contains all models specific to the device abstractions."""
from __future__ import annotations
import datetime
import secrets
from typing import TYPE_CHECKING
from cryptography.hazmat.primitives import hashes
from django.core.exceptions import ValidationError
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django_stubs_ext.db.models import TypedModelMeta
from pki.models.certificate import CertificateModel, RevokedCertificateModel
from pki.models.credential import CredentialModel
from pki.models.domain import DomainModel
from pki.models.issuing_ca import IssuingCaModel
from pki.models.truststore import TruststoreModel
from pyasn1_modules.rfc3280 import common_name # type: ignore[import-untyped]
from trustpoint_core import oid
from util.db import CustomDeleteActionModel
if TYPE_CHECKING:
from typing import Any
from cryptography import x509
__all__ = [
'DeviceModel',
'IssuedCredentialModel',
'RemoteDeviceCredentialDownloadModel',
]
class OnboardingStatus(models.IntegerChoices):
"""The onboarding status."""
PENDING = 1, _('Pending')
ONBOARDED = 2, _('Onboarded')
class OnboardingProtocol(models.IntegerChoices):
"""Choices of onboarding protocols."""
MANUAL = 0, _('Manual Onboarding')
CMP_IDEVID = 1, _('CMP - IDevID')
CMP_SHARED_SECRET = 2, _('CMP - Shared Secret')
EST_IDEVID = 3, _('EST - IDevID')
EST_USERNAME_PASSWORD = 4, _('EST - Username & Password')
AOKI = 5, _('AOKI')
BRSKI = 6, _('BRSKI')
class OnboardingPkiProtocol(models.IntegerChoices):
"""Choices for onboarding pki protocols."""
# Bitmask: Only use powers of 2: 1, 2, 4, 8, 16 ...
CMP = 1, _('CMP')
EST = 2, _('EST')
class NoOnboardingPkiProtocol(models.IntegerChoices):
"""Choices for no onboarding pki protocols."""
# Bitmask: Only use powers of 2: 1, 2, 4, 8, 16 ...
CMP_SHARED_SECRET = 1, _('CMP - Shared Secret (HMAC)')
# 2 reserved for CMP Client Certificate
EST_USERNAME_PASSWORD = 4, _('EST - Username & Password')
# 8 reserved for EST Client Certificate
MANUAL = 16, _('Manual')
class AbstractPkiProtocolModel[T: models.IntegerChoices]:
"""Extends a model for IntegerChoices stored as bitwise flags."""
pki_protocol_class: type[T]
def add_pki_protocol(self, pki_protocol: T) -> None:
"""Adds the provided PkiProtocol to the allowed protocols for onboarded devices.
Args:
pki_protocol: The PkiProtocol to allow.
"""
self.pki_protocols |= pki_protocol.value
def remove_pki_protocol(self, pki_protocol: T) -> None:
"""Removes the provided PkiProtocol from the allowed protocols if it is allowed..
Args:
pki_protocol: The PkiProtocol to forbid.
"""
self.pki_protocols &= ~pki_protocol
def clear_pki_protocols(self) -> None:
"""Clears all allowed PkiProtocols, that is, it deactivates application certificate issuance."""
self.pki_protocols = 0
def has_pki_protocol(self, pki_protocol: T) -> bool:
"""Checks if the provided PkiProtocol is allowed.
Args:
pki_protocol: The PkiProtocol that is checked against the allowed ones.
Returns:
Returns True if the provided PkiProtocol is allowed, False otherwise.
"""
return (self.pki_protocols & pki_protocol) == pki_protocol
def get_pki_protocols(self) -> list[T]:
"""Gets all allowed PkiProtocols.
Returns:
Retruns the allowed PkiProtocols as list.
"""
return [pki_protocol for pki_protocol in self.pki_protocol_class if self.has_pki_protocol(pki_protocol)]
def set_pki_protocols(self, pki_protocols: list[T]) -> None:
"""Sets all allowed PkiProtocols exactly matching the provided list."""
self.clear_pki_protocols()
for pki_protocol in pki_protocols:
self.add_pki_protocol(pki_protocol)
class OnboardingConfigModel(AbstractPkiProtocolModel[OnboardingPkiProtocol], models.Model):
"""Onboarding Configuration Model."""
pki_protocol_class = OnboardingPkiProtocol
pki_protocols = models.PositiveIntegerField(
verbose_name=_('Pki Protocol Bitwise Flag'), null=False, blank=True, default=0
)
onboarding_status = models.IntegerField(
choices=OnboardingStatus,
verbose_name=_('Onboarding Status'),
null=False,
blank=False,
default=OnboardingStatus.PENDING,
)
onboarding_protocol = models.PositiveIntegerField(
choices=OnboardingProtocol,
verbose_name=_('Onboarding Protocol'),
null=False,
blank=False,
)
# these will be dropped after successfull onboarding
est_password = models.CharField(verbose_name=_('EST Password'), max_length=128, blank=True, default='')
cmp_shared_secret = models.CharField(verbose_name=_('CMP Shared Secret'), max_length=128, blank=True, default='')
idevid_trust_store = models.ForeignKey(
TruststoreModel,
verbose_name=_('IDevID Manufacturer Truststore'),
null=True,
blank=True,
on_delete=models.SET_NULL,
)
def __str__(self) -> str:
"""Gets the model instance as human-readable string."""
return (
'OnboardingConfigModel('
f'onboarding_status:{OnboardingStatus(self.onboarding_status).label}, '
f'onboarding_protocol:{OnboardingProtocol(self.onboarding_protocol).label}, '
f'cmp_shared_secret:{bool(self.cmp_shared_secret)}, '
f'est_password:{bool(self.est_password)})'
)
def save(self, *args: Any, **kwargs: Any) -> None:
"""Executes full_clean() before saving.
Args:
*args: Positional arguments are passed to super().save().
**kwargs: Keyword arguments are passed to super().save().
"""
self.full_clean()
super().save(*args, **kwargs)
def clean(self) -> None:
"""Validation before saving the model."""
error_messages = None
match self.onboarding_protocol:
case OnboardingProtocol.MANUAL:
error_messages = self._validate_case_manual_onboarding()
case OnboardingProtocol.CMP_IDEVID:
error_messages = self._validate_case_cmp_idevid_onboarding()
case OnboardingProtocol.CMP_SHARED_SECRET:
error_messages = self._validate_case_cmp_shared_secret_onboarding()
case OnboardingProtocol.EST_IDEVID:
error_messages = self._validate_case_est_idevid_onboarding()
case OnboardingProtocol.EST_USERNAME_PASSWORD:
error_messages = self._validate_case_est_username_password_onboarding()
case OnboardingProtocol.AOKI:
err_msg = 'AOKI is not yet supported as onboarding protocol.'
raise ValidationError(err_msg)
case OnboardingProtocol.BRSKI:
err_msg = 'BRSKI is not yet supported as onboarding protocol.'
raise ValidationError(err_msg)
case _:
err_msg = f'Unknown onboarding protocol found: {self.onboarding_protocol}.'
raise ValidationError(err_msg)
if error_messages:
raise ValidationError(error_messages)
def _validate_case_manual_onboarding(self) -> dict[str, str]:
"""Validates case OnboardingProtocol.MANUAL.
Args:
error_messages: The container that gathers all error messages.
Returns:
The error_messages gathered.
"""
error_messages = {}
if self.est_password != '':
error_messages['est_password'] = 'EST password must not be set for manual onboarding.' # noqa: S105
if self.cmp_shared_secret != '':
error_messages['cmp_shared_secret'] = 'CMP shared-secret must not be set for manual onboarding.' # noqa: S105
if self.idevid_trust_store is not None:
error_messages['idevid_trust_store'] = 'IDevID truststore must not be set for manual onboarding.'
return error_messages
def _validate_case_cmp_idevid_onboarding(self) -> dict[str, str]:
"""Validates case OnboardingProtocol.CMP_IDEVID.
Args:
error_messages: The container that gathers all error messages.
Returns:
The error_messages gathered.
"""
return {}
def _validate_case_cmp_shared_secret_onboarding(self) -> dict[str, str]:
"""Validates case OnboardingProtocol.CMP_SHARED_SECRET.
Args:
error_messages: The container that gathers all error messages.
Returns:
The error_messages gathered.
"""
error_messages = {}
if self.est_password != '':
error_messages['est_password'] = 'EST password must not be set for CMP shared-secret onboarding.' # noqa: S105
if self.idevid_trust_store is not None:
error_messages['idevid_trust_store'] = 'IDevID truststore must not be set for CMP shared-secret onboarding.'
return error_messages
def _validate_case_est_idevid_onboarding(self) -> dict[str, str]:
"""Validates case OnboardingProtocol.EST_IDEVID.
Args:
error_messages: The container that gathers all error messages.
Returns:
The error_messages gathered.
"""
error_messages = {}
if self.est_password != '':
error_messages['est_password'] = 'EST password must not be set for EST IDevID onboarding.' # noqa: S105
if self.cmp_shared_secret != '':
error_messages['cmp_shared_secret'] = 'CMP shared-secret must not be set for EST IDevID onboarding.' # noqa: S105
# idevid_trust_store can be left blank while this would ofcourse mean no onboarding is possible.
return error_messages
def _validate_case_est_username_password_onboarding(self) -> dict[str, str]:
"""Validates case OnboardingProtocol.EST_USERNAME_PASSWORD.
Args:
error_messages: The container that gathers all error messages.
Returns:
The error_messages gathered.
"""
error_messages = {}
if self.cmp_shared_secret != '':
error_messages['cmp_shared_secret'] = (
'CMP shared-secret must not be set for EST username / password onboarding.' # noqa: S105
)
if self.idevid_trust_store is not None:
error_messages['idevid_trust_store'] = (
'IDevID truststore must not be set for EST username / password onboarding.'
)
return error_messages
class NoOnboardingConfigModel(AbstractPkiProtocolModel[NoOnboardingPkiProtocol], models.Model):
"""No Onboarding Configuration Model."""
pki_protocol_class = NoOnboardingPkiProtocol
pki_protocols = models.PositiveIntegerField(
verbose_name=_('Pki Protocol Bitwise Flag'), null=False, blank=True, default=0
)
est_password = models.CharField(verbose_name=_('EST Password'), max_length=128, blank=True, default='')
cmp_shared_secret = models.CharField(verbose_name=_('CMP Shared Secret'), max_length=128, blank=True, default='')
def __str__(self) -> str:
"""Gets the model instance as human-readable string."""
return (
'NoOnboardingConfigModel('
f'cmp_shared_secret:{bool(self.cmp_shared_secret)}'
f'est_password:{bool(self.est_password)})'
)
def save(self, *args: Any, **kwargs: Any) -> None:
"""Executes full_clean() before saving.
Args:
*args: Positional arguments are passed to super().save().
**kwargs: Keyword arguments are passed to super().save().
"""
self.full_clean()
super().save(*args, **kwargs)
def clean(self) -> None:
"""Validation before saving the model."""
error_messages = {}
if self.cmp_shared_secret != '' and not self.has_pki_protocol(NoOnboardingPkiProtocol.CMP_SHARED_SECRET):
error_messages['cmp_shared_secret'] = (
'CMP shared-secret must not be set if EST_USERNAME_PASSWORD is not enabled.' # noqa: S105
)
if self.cmp_shared_secret == '' and self.has_pki_protocol(NoOnboardingPkiProtocol.CMP_SHARED_SECRET):
error_messages['cmp_shared_secret'] = 'CMP shared-secret must be set if EST_USERNAME_PASSWORD is enabled.' # noqa: S105
if self.est_password != '' and not self.has_pki_protocol(NoOnboardingPkiProtocol.EST_USERNAME_PASSWORD):
error_messages['est_password'] = 'EST password must not be set if EST_USERNAME_PASSWORD is not enabled.' # noqa: S105
if self.est_password == '' and self.has_pki_protocol(NoOnboardingPkiProtocol.EST_USERNAME_PASSWORD):
error_messages['est_password'] = 'EST password must be set if EST_USERNAME_PASSWORD is enabled.' # noqa: S105
[docs]
class DeviceModel(CustomDeleteActionModel):
"""The DeviceModel."""
[docs]
common_name = models.CharField(_('Device'), max_length=100, default='', unique=True)
[docs]
serial_number = models.CharField(_('Serial-Number'), max_length=100, default='', blank=True, null=False)
[docs]
domain = models.ForeignKey(
DomainModel, verbose_name=_('Domain'), related_name='devices', blank=True, null=True, on_delete=models.PROTECT
)
[docs]
onboarding_config = models.ForeignKey(
OnboardingConfigModel,
verbose_name=_('Onboarding Config'),
related_name='device',
blank=True,
null=True,
on_delete=models.PROTECT,
)
[docs]
no_onboarding_config = models.ForeignKey(
NoOnboardingConfigModel,
verbose_name=_('No Onboarding Config'),
related_name='device',
blank=True,
null=True,
on_delete=models.PROTECT,
)
[docs]
class DeviceType(models.IntegerChoices):
"""Enum for device type."""
[docs]
GENERIC_DEVICE = 0, _('Generic Device')
[docs]
OPC_UA_GDS = 1, _('OPC UA GDS')
[docs]
device_type = models.IntegerField(
choices=DeviceType,
verbose_name=_('Device Type'),
default=DeviceType.GENERIC_DEVICE,
)
[docs]
created_at = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
[docs]
def __str__(self) -> str:
"""Returns a human-readable string representation."""
return f'DeviceModel(common_name={self.common_name})'
[docs]
def pre_delete(self) -> None:
"""Delete all issued credentials for this device before deleting the device itself."""
self.issued_credentials.all().delete()
@property
[docs]
def est_username(self) -> str:
"""Gets the EST username."""
return self.common_name
@property
[docs]
def signature_suite(self) -> oid.SignatureSuite | None:
"""Gets the corresponding SignatureSuite object."""
if self.domain is None:
return None
return oid.SignatureSuite.from_certificate(
self.domain.get_issuing_ca_or_value_error().credential.get_certificate_serializer().as_crypto()
)
@property
[docs]
def public_key_info(self) -> oid.PublicKeyInfo | None:
"""Gets the corresponding PublicKeyInfo object."""
if self.signature_suite is None:
return None
return self.signature_suite.public_key_info
[docs]
def clean(self) -> None:
"""Validation before saving the model."""
if not (self.onboarding_config or self.no_onboarding_config):
err_msg = 'Either onboarding or no-onboarding has to be configured.'
raise ValidationError(err_msg)
if self.onboarding_config and self.no_onboarding_config:
err_msg = 'Only one of onboarding or no-onboarding can be configured.'
raise ValidationError(err_msg)
[docs]
class IssuedCredentialModel(CustomDeleteActionModel):
"""Model for all credentials and certificates that have been issued or requested by the Trustpoint."""
[docs]
class IssuedCredentialType(models.IntegerChoices):
"""The type of the credential."""
[docs]
DOMAIN_CREDENTIAL = 0, _('Domain Credential')
[docs]
APPLICATION_CREDENTIAL = 1, _('Application Credential')
[docs]
class IssuedCredentialPurpose(models.IntegerChoices):
"""The purpose of the issued credential."""
[docs]
DOMAIN_CREDENTIAL = 0, _('Domain Credential')
[docs]
GENERIC = 1, _('Generic')
[docs]
TLS_CLIENT = 2, _('TLS-Client')
[docs]
TLS_SERVER = 3, _('TLS-Server')
[docs]
OPCUA_CLIENT = 4, _('OpcUa-Client')
[docs]
OPCUA_SERVER = 5, _('OpcUa-Server')
[docs]
id = models.AutoField(primary_key=True)
[docs]
common_name = models.CharField(verbose_name=_('Common Name'), max_length=255)
[docs]
issued_credential_type = models.IntegerField(choices=IssuedCredentialType, verbose_name=_('Credential Type'))
[docs]
issued_credential_purpose = models.IntegerField(
choices=IssuedCredentialPurpose, verbose_name=_('Credential Purpose')
)
[docs]
credential = models.OneToOneField(
CredentialModel,
verbose_name=_('Credential'),
on_delete=models.CASCADE,
related_name='issued_credential',
null=False,
blank=False,
)
[docs]
device = models.ForeignKey(
DeviceModel, verbose_name=_('Device'), on_delete=models.PROTECT, related_name='issued_credentials'
)
[docs]
domain = models.ForeignKey(
DomainModel, verbose_name=_('Domain'), on_delete=models.PROTECT, related_name='issued_credentials'
)
[docs]
created_at = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
[docs]
def __str__(self) -> str:
"""Returns a human-readable string representation."""
return f'IssuedCredentialModel(common_name={common_name})'
[docs]
def revoke(self) -> None:
"""Revokes all active certificates associated with this credential."""
cert: CertificateModel
for cert in self.credential.certificates.all():
status = cert.certificate_status
if status in (CertificateModel.CertificateStatus.REVOKED, CertificateModel.CertificateStatus.EXPIRED):
continue
try:
ca = IssuingCaModel.objects.get(credential__certificate__subject_public_bytes=cert.issuer_public_bytes)
except IssuingCaModel.DoesNotExist:
continue
except IssuingCaModel.MultipleObjectsReturned:
continue
RevokedCertificateModel.objects.create(
certificate=cert, revocation_reason=RevokedCertificateModel.ReasonCode.CESSATION, ca=ca
)
[docs]
def pre_delete(self) -> None:
"""Revoke all active certificates and delete the credential."""
self.revoke()
self.credential.delete() # this will also delete the IssuedCredentialModel via cascade
[docs]
def is_valid_domain_credential(self) -> tuple[bool, str]:
"""Determines if this issued credential is valid for enrolling new application credentials.
This method performs the following checks:
1. The IssuedCredentialModel type must be of type DOMAIN_CREDENTIAL.
2. The credential must be of type ISSUED_CREDENTIAL.
3. A primary certificate must exist.
4. The certificate's status must be 'OK'.
Returns:
tuple[bool, str]: A tuple where:
- The first value is True if the credential meets all criteria, False otherwise.
- The second value is a reason string explaining why the credential is invalid.
"""
if self.issued_credential_type != IssuedCredentialModel.IssuedCredentialType.DOMAIN_CREDENTIAL:
return False, 'Invalid issued credential type: Must be DOMAIN_CREDENTIAL.'
result, reason = self.credential.is_valid_issued_credential()
if not result:
return False, reason
return True, 'Valid domain credential.'
@staticmethod
[docs]
def get_credential_for_certificate(cert: x509.Certificate) -> IssuedCredentialModel:
"""Retrieve an IssuedCredentialModel instance for the given certificate.
:param cert: x509.Certificate to search for.
:return: The corresponding IssuedCredentialModel instance.
:raises ClientCertificateAuthenticationError: if no matching issued credential is found.
"""
cert_fingerprint = cert.fingerprint(hashes.SHA256()).hex().upper()
credential = CredentialModel.objects.filter(certificates__sha256_fingerprint=cert_fingerprint).first()
if not credential:
error_message = f'No credential found for certificate with fingerprint {cert_fingerprint}'
raise IssuedCredentialModel.DoesNotExist(error_message)
try:
issued_credential = IssuedCredentialModel.objects.get(credential=credential)
except IssuedCredentialModel.DoesNotExist:
error_message = f'No issued credential found for certificate with fingerprint {cert_fingerprint}'
raise IssuedCredentialModel.DoesNotExist(error_message) from None
return issued_credential
[docs]
class RemoteDeviceCredentialDownloadModel(models.Model):
"""Model to associate a credential model with an OTP and token for unauthenticated remoted download."""
[docs]
BROWSER_MAX_OTP_ATTEMPTS = 3
[docs]
TOKEN_VALIDITY = datetime.timedelta(minutes=3)
[docs]
issued_credential_model = models.OneToOneField(IssuedCredentialModel, on_delete=models.CASCADE)
[docs]
otp = models.CharField(_('OTP'), max_length=32, default='')
[docs]
device = models.ForeignKey(DeviceModel, on_delete=models.CASCADE)
[docs]
attempts = models.IntegerField(_('Attempts'), default=0)
[docs]
download_token = models.CharField(_('Download Token'), max_length=64, default='')
[docs]
token_created_at = models.DateTimeField(_('Token Created'), null=True)
[docs]
def __str__(self) -> str:
"""Return a string representation of the model."""
return f'RemoteDeviceCredentialDownloadModel(credential={self.issued_credential_model.id})'
[docs]
def save(self, *args: Any, **kwargs: Any) -> None:
"""Generates a new random OTP on initial save of the model."""
if not self.otp:
self.otp = secrets.token_urlsafe(8)
super().save(*args, **kwargs)
[docs]
def get_otp_display(self) -> str:
"""Return the OTP in the format 'credential_id.otp' for display within the admin view.
Returns:
The str to display.
"""
if not self.otp or self.otp == '-':
return 'OTP no longer valid'
return f'{self.issued_credential_model.id}.{self.otp}'
[docs]
def check_otp(self, otp: str) -> bool:
"""Check if the provided OTP matches the stored OTP.
Args:
otp: The OTP to check.
Returns:
True if the OTP is valid, False otherwise.
"""
if not self.otp or self.otp == '-':
return False
matches = otp == self.otp
if not matches:
self.attempts += 1
if self.attempts >= self.BROWSER_MAX_OTP_ATTEMPTS:
self.otp = '-'
self.delete()
else:
self.save()
return False
self.otp = '-'
self.download_token = secrets.token_urlsafe(32)
self.token_created_at = timezone.now()
self.save()
return True
[docs]
def check_token(self, token: str) -> bool:
"""Check if the provided token matches the stored token and whether it is still valid.
Args:
token: The token to check.
Returns:
True if the token is valid, false otherwise.
"""
if not self.download_token or not self.token_created_at:
return False
if timezone.now() - self.token_created_at > self.TOKEN_VALIDITY:
self.delete()
return False
return token == self.download_token