Source code for devices.models

"""This modules contains all models specific to the device abstractions."""

from __future__ import annotations

import datetime
import secrets
from typing import TYPE_CHECKING

from cryptography.hazmat.primitives import hashes
from django.core.exceptions import ValidationError
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django_stubs_ext.db.models import TypedModelMeta
from pyasn1_modules.rfc3280 import common_name  # type: ignore[import-untyped]

from onboarding.models import (
    AbstractPkiProtocolModel,
    NoOnboardingConfigModel,
    NoOnboardingPkiProtocol,
    OnboardingConfigModel,
    OnboardingPkiProtocol,
    OnboardingProtocol,
    OnboardingStatus,
)
from pki.models.certificate import CertificateModel, RevokedCertificateModel
from pki.models.credential import CredentialModel
from pki.models.domain import DomainModel
from util.db import CustomDeleteActionModel

if TYPE_CHECKING:
    from typing import Any

    from cryptography import x509


__all__ = [
    'AbstractPkiProtocolModel',
    'DeviceModel',
    'IssuedCredentialModel',
    'NoOnboardingConfigModel',
    'NoOnboardingPkiProtocol',
    'OnboardingConfigModel',
    'OnboardingPkiProtocol',
    'OnboardingProtocol',
    'OnboardingStatus',
    'RemoteDeviceCredentialDownloadModel',
]


[docs] class DeviceModel(CustomDeleteActionModel): """The DeviceModel."""
[docs] common_name = models.CharField(_('Device'), max_length=100, default='', unique=True)
[docs] serial_number = models.CharField(_('Serial-Number'), max_length=100, default='', blank=True, null=False)
[docs] ip_address = models.GenericIPAddressField(_('IP Address'), protocol='both', unpack_ipv4=True, null=True, blank=True)
[docs] opc_server_port = models.PositiveIntegerField(_('OPC Server Port'), default=0, blank=True, null=False)
[docs] domain = models.ForeignKey( DomainModel, verbose_name=_('Domain'), related_name='devices', blank=True, null=True, on_delete=models.PROTECT )
[docs] onboarding_config = models.ForeignKey( OnboardingConfigModel, verbose_name=_('Onboarding Config'), related_name='device', blank=True, null=True, on_delete=models.PROTECT, )
[docs] no_onboarding_config = models.ForeignKey( NoOnboardingConfigModel, verbose_name=_('No Onboarding Config'), related_name='device', blank=True, null=True, on_delete=models.PROTECT, )
[docs] class DeviceType(models.IntegerChoices): """Enum for device type."""
[docs] GENERIC_DEVICE = 0, _('Generic Device')
[docs] OPC_UA_GDS = 1, _('OPC UA GDS')
[docs] OPC_UA_GDS_PUSH = 2, _('OPC UA GDS Push')
[docs] device_type = models.IntegerField( choices=DeviceType, verbose_name=_('Device Type'), default=DeviceType.GENERIC_DEVICE, )
[docs] created_at = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
[docs] def __str__(self) -> str: """Returns a human-readable string representation.""" return f'DeviceModel(common_name={self.common_name})'
[docs] def pre_delete(self) -> None: """Delete all issued credentials for this device before deleting the device itself.""" self.issued_credentials.all().delete()
@property
[docs] def est_username(self) -> str: """Gets the EST username.""" return self.common_name
[docs] def clean(self) -> None: """Validation before saving the model.""" error_messages = {} if not (self.onboarding_config or self.no_onboarding_config): error_messages['onboarding_config'] = 'Either onboarding or no-onboarding has to be configured.' if self.onboarding_config and self.no_onboarding_config: error_messages['onboarding_config'] = 'Only one of onboarding or no-onboarding can be configured.' if self.device_type == DeviceModel.DeviceType.OPC_UA_GDS_PUSH: if not self.onboarding_config: error_messages['device_type'] = 'OPC UA GDS Push devices must use onboarding configuration.' elif self.onboarding_config.onboarding_protocol != OnboardingProtocol.OPC_GDS_PUSH: error_messages['device_type'] = 'OPC UA GDS Push devices must use OPC_GDS_PUSH onboarding protocol.' if not self.ip_address: error_messages['ip_address'] = 'OPC UA GDS Push devices must have an IP address.' if not self.opc_server_port or self.opc_server_port == 0: error_messages['opc_server_port'] = 'OPC UA GDS Push devices must have a valid OPC server port.' if error_messages: raise ValidationError(error_messages)
[docs] class IssuedCredentialModel(CustomDeleteActionModel): """Model for all credentials and certificates that have been issued or requested by the Trustpoint."""
[docs] class IssuedCredentialType(models.IntegerChoices): """The type of the credential."""
[docs] DOMAIN_CREDENTIAL = 0, _('Domain Credential')
[docs] APPLICATION_CREDENTIAL = 1, _('Application Credential')
[docs] id = models.AutoField(primary_key=True)
[docs] common_name = models.CharField(verbose_name=_('Common Name'), max_length=255)
[docs] issued_credential_type = models.IntegerField(choices=IssuedCredentialType, verbose_name=_('Credential Type'))
[docs] issued_using_cert_profile = models.CharField( max_length=255, verbose_name=_('Issued using Certificate Profile'), default='' )
[docs] credential = models.OneToOneField( CredentialModel, verbose_name=_('Credential'), on_delete=models.CASCADE, related_name='issued_credential', null=False, blank=False, )
[docs] device = models.ForeignKey( 'devices.DeviceModel', verbose_name=_('Device'), on_delete=models.PROTECT, related_name='issued_credentials' )
[docs] domain = models.ForeignKey( DomainModel, verbose_name=_('Domain'), on_delete=models.PROTECT, related_name='issued_credentials' )
[docs] created_at = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
[docs] class Meta(TypedModelMeta): """Meta class configuration."""
[docs] def __str__(self) -> str: """Returns a human-readable string representation.""" return f'IssuedCredentialModel(common_name={common_name})'
[docs] def revoke(self) -> None: """Revokes all active certificates associated with this credential.""" if self.domain.issuing_ca is None: return ca = self.domain.issuing_ca cert: CertificateModel for cert in self.credential.certificates.all(): status = cert.certificate_status if status in (CertificateModel.CertificateStatus.REVOKED, CertificateModel.CertificateStatus.EXPIRED): continue RevokedCertificateModel.objects.create( certificate=cert, revocation_reason=RevokedCertificateModel.ReasonCode.CESSATION, ca=ca )
[docs] def pre_delete(self) -> None: """Revoke all active certificates and delete the credential.""" self.revoke() self.credential.delete() # this will also delete the IssuedCredentialModel via cascade
[docs] def is_valid_domain_credential(self) -> tuple[bool, str]: """Determines if this issued credential is valid for enrolling new application credentials. This method performs the following checks: 1. The IssuedCredentialModel type must be of type DOMAIN_CREDENTIAL. 2. The credential must be of type ISSUED_CREDENTIAL. 3. A primary certificate must exist. 4. The certificate's status must be 'OK'. Returns: tuple[bool, str]: A tuple where: - The first value is True if the credential meets all criteria, False otherwise. - The second value is a reason string explaining why the credential is invalid. """ if self.issued_credential_type != IssuedCredentialModel.IssuedCredentialType.DOMAIN_CREDENTIAL: return False, 'Invalid issued credential type: Must be DOMAIN_CREDENTIAL.' result, reason = self.credential.is_valid_issued_credential() if not result: return False, reason return True, 'Valid domain credential.'
@staticmethod
[docs] def get_credential_for_certificate(cert: x509.Certificate) -> IssuedCredentialModel: """Retrieve an IssuedCredentialModel instance for the given certificate. :param cert: x509.Certificate to search for. :return: The corresponding IssuedCredentialModel instance. :raises ClientCertificateAuthenticationError: if no matching issued credential is found. """ cert_fingerprint = cert.fingerprint(hashes.SHA256()).hex().upper() credential = CredentialModel.objects.filter(certificates__sha256_fingerprint=cert_fingerprint).first() if not credential: error_message = f'No credential found for certificate with fingerprint {cert_fingerprint}' raise IssuedCredentialModel.DoesNotExist(error_message) try: issued_credential = IssuedCredentialModel.objects.get(credential=credential) except IssuedCredentialModel.DoesNotExist: error_message = f'No issued credential found for certificate with fingerprint {cert_fingerprint}' raise IssuedCredentialModel.DoesNotExist(error_message) from None return issued_credential
[docs] class RemoteDeviceCredentialDownloadModel(models.Model): """Model to associate a credential model with an OTP and token for unauthenticated remoted download."""
[docs] BROWSER_MAX_OTP_ATTEMPTS = 3
[docs] TOKEN_VALIDITY = datetime.timedelta(minutes=3)
[docs] issued_credential_model = models.OneToOneField(IssuedCredentialModel, on_delete=models.CASCADE)
[docs] otp = models.CharField(_('OTP'), max_length=32, default='')
[docs] device = models.ForeignKey('devices.DeviceModel', on_delete=models.CASCADE)
[docs] attempts = models.IntegerField(_('Attempts'), default=0)
[docs] download_token = models.CharField(_('Download Token'), max_length=64, default='')
[docs] token_created_at = models.DateTimeField(_('Token Created'), null=True)
[docs] class Meta(TypedModelMeta): """Meta class configuration."""
[docs] def __str__(self) -> str: """Return a string representation of the model.""" return f'RemoteDeviceCredentialDownloadModel(credential={self.issued_credential_model.id})'
[docs] def save(self, *args: Any, **kwargs: Any) -> None: """Generates a new random OTP on initial save of the model.""" if not self.otp: self.otp = secrets.token_urlsafe(8) super().save(*args, **kwargs)
[docs] def get_otp_display(self) -> str: """Return the OTP in the format 'credential_id.otp' for display within the admin view. Returns: The str to display. """ if not self.otp or self.otp == '-': return 'OTP no longer valid' return f'{self.issued_credential_model.id}.{self.otp}'
[docs] def check_otp(self, otp: str) -> bool: """Check if the provided OTP matches the stored OTP. Args: otp: The OTP to check. Returns: True if the OTP is valid, False otherwise. """ if not self.otp or self.otp == '-': return False matches = otp == self.otp if not matches: self.attempts += 1 if self.attempts >= self.BROWSER_MAX_OTP_ATTEMPTS: self.otp = '-' self.delete() else: self.save() return False self.otp = '-' self.download_token = secrets.token_urlsafe(32) self.token_created_at = timezone.now() self.save() return True
[docs] def check_token(self, token: str) -> bool: """Check if the provided token matches the stored token and whether it is still valid. Args: token: The token to check. Returns: True if the token is valid, false otherwise. """ if not self.download_token or not self.token_created_at: return False if timezone.now() - self.token_created_at > self.TOKEN_VALIDITY: self.delete() return False return token == self.download_token