Source code for notifications.models

"""Module that contains all models corresponding to the devices app."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, cast

from devices.models import DeviceModel
from django.db import models
from django.utils.encoding import force_str
from django.utils.translation import gettext_lazy as _
from django_stubs_ext.db.models import TypedModelMeta
from pki.models.certificate import CertificateModel
from pki.models.domain import DomainModel
from pki.models.issuing_ca import IssuingCaModel

if TYPE_CHECKING:
    from django_stubs_ext import StrOrPromise

[docs] log = logging.getLogger('tp.home')
[docs] class NotificationStatus(models.Model): """Model representing a status a notification can have."""
[docs] class StatusChoices(models.TextChoices): """Status Types."""
[docs] NEW = 'NEW', _('New')
[docs] CONFIRMED = 'CONF', _('Confirmed')
[docs] IN_PROGRESS = 'PROG', _('In Progress')
[docs] SOLVED = 'SOLV', _('Solved')
[docs] NOT_SOLVED = 'NOSOL', _('Not Solved')
[docs] ESCALATED = 'ESC', _('Escalated')
[docs] SUSPENDED = 'SUS', _('Suspended')
[docs] REJECTED = 'REJ', _('Rejected')
[docs] DELETED = 'DEL', _('Deleted')
[docs] CLOSED = 'CLO', _('Closed')
[docs] ACKNOWLEDGED = 'ACK', _('Acknowledged')
[docs] FAILED = 'FAIL', _('Failed')
[docs] EXPIRED = 'EXP', _('Expired')
[docs] PENDING = 'PEND', _('Pending')
[docs] status = models.CharField(max_length=20, choices=StatusChoices, unique=True)
if TYPE_CHECKING:
[docs] def get_status_display(self) -> str: """Gets the status as human-readable string for displaying in the front-end. Returns: Human-readable string for displaying in the front-end. """
[docs] class Meta(TypedModelMeta): """Meta class configuration."""
[docs] def __str__(self) -> str: """Returns a human-readable string. Returns: The status. """ return str(self.get_status_display())
[docs] class NotificationMessageModel(models.Model): """Message Model for Notifications with Short and Optional Long Descriptions."""
[docs] short_description = models.CharField(max_length=255)
[docs] long_description = models.CharField(max_length=65536, default='No description provided')
[docs] class Meta(TypedModelMeta): """Meta class configuration."""
[docs] def __str__(self) -> str: """Returns a human-readable string. Returns: The short description. """ return self.short_description[:50]
[docs] class NotificationMessage: """Class for notification content with short and optional long descriptions."""
[docs] short_description: str
[docs] long_description: str = 'No description provided'
def __init__( self, short_description: StrOrPromise, long_description: StrOrPromise = 'No description provided' ) -> None: """Initializes a NotificationMessage instance. Returns: It returns nothing. """ self.short_description = force_str(short_description) self.long_description = force_str(long_description)
[docs] def __str__(self) -> str: """Returns a human-readable string. Returns: The short description. """ return self.short_description[:50]
@property
[docs] def short(self) -> str: """Returns the short description. Returns: The short description. """ return self.short_description
@property
[docs] def long(self) -> str: """Returns the long description. Returns: The long description. """ return self.long_description
[docs] class NotificationModel(models.Model): """Notifications Model."""
[docs] class NotificationTypes(models.TextChoices): """Supported Notification Types."""
[docs] SETUP = 'SET', _('SETUP')
[docs] INFO = 'INF', _('INFO')
[docs] WARNING = 'WAR', _('WARNING')
[docs] CRITICAL = 'CRI', _('CRITICAL')
[docs] class NotificationSource(models.TextChoices): """Origin of the Notification."""
[docs] SYSTEM = 'S', _('System')
[docs] DOMAIN = 'D', _('Domain')
[docs] DEVICE = 'E', _('Device')
[docs] ISSUING_CA = 'I', _('Issuing CA')
[docs] CERTIFICATE = 'C', _('Certificate')
[docs] class NotificationMessageType(models.TextChoices): """Types of messages (aka. unique strings to that particular notification)."""
[docs] CUSTOM = 'C', 'custom' # custom message, i.e. strings are stored in the database. Can not be translated.
# Test notifications, generated by create_notifications.py
[docs] ISSUING_CA_TEST = 'TEST_CA'
[docs] DOMAIN_TEST = 'TEST_DOMAIN'
[docs] CERT_TEST = 'TEST_CERT'
[docs] DEVICE_TEST = 'TEST_DEVICE'
# Welcome notifications
[docs] WELCOME_POPULATE_TEST_DATA = 'POP_TEST_DATA'
[docs] TRUSTPOINT_DOCUMENTATION = 'TP_DOCS'
[docs] TRUSTPOINT_PROJECT_INFO = 'TP_INFO'
[docs] WELCOME_MESSAGE = 'WELCOME'
# Periodic task notifications
[docs] SYSTEM_NOT_HEALTHY = 'SYS_NOT_HEALTHY'
[docs] VULNERABILITY = 'VULNERABILITY'
[docs] CERT_EXPIRING = 'CERT_EXPIRING'
[docs] CERT_EXPIRED = 'CERT_EXPIRED'
[docs] ISSUING_CA_EXPIRING = 'CA_EXPIRING'
[docs] ISSUING_CA_EXPIRED = 'CA_EXPIRED'
[docs] DOMAIN_NO_ISSUING_CA = 'DOMAIN_NO_CA'
[docs] DEVICE_NOT_ONBOARDED = 'DEV_NOT_ONBRD'
[docs] DEVICE_ONBOARDING_FAILED = 'DEV_ONBRD_FAIL'
[docs] DEVICE_CERT_REVOKED = 'DEV_CERT_REV'
[docs] WEAK_SIGNATURE_ALGORITHM = 'WEAK_SIG_ALGO'
[docs] INSUFFICIENT_KEY_LENGTH = 'INSUFF_KEY_LEN'
[docs] WEAK_ECC_CURVE = 'WEAK_ECC_CURVE'
[docs] def get_message(self) -> NotificationMessage: """Returns the message for the given type. Returns: The notification message. """ message_dict = { self.CUSTOM: NotificationMessage(_('Custom Message')), # Test notifications, generated by create_notifications.py self.ISSUING_CA_TEST: NotificationMessage( _('Test for Issuing CA: {ca}'), _('Notification for Issuing CA: {ca}') ), self.DOMAIN_TEST: NotificationMessage( _('Test for Domain: {domain}'), _('Notification for Domain: {domain}') ), self.CERT_TEST: NotificationMessage( _('Test for Certificate: {cn}'), _('Notification for Certificate: Common Name {cn} with Serial Number {sn}'), ), self.DEVICE_TEST: NotificationMessage( _('Test for Device: {device}'), _('Notification for Device: {device}') ), # Welcome notifications self.WELCOME_POPULATE_TEST_DATA: NotificationMessage( _('Populate test data'), _('Click <a href="{url}">here</a> to add test issuing CAs, domains and devices.'), ), self.TRUSTPOINT_DOCUMENTATION: NotificationMessage( _('Access the Trustpoint Documentation'), _('You can find the official Trustpoint documentation here: {link}'), ), self.TRUSTPOINT_PROJECT_INFO: NotificationMessage( _('Explore the Trustpoint project'), _( 'Visit the Trustpoint GitHub repository for more information: ' '<a href="{url_github}" target="_blank">Trustpoint GitHub</a><br>' 'Learn more about industrial security and the Trustpoint project on our ' '<a href="{url_homepage}" target="_blank">homepage</a>' ), ), self.WELCOME_MESSAGE: NotificationMessage( _('Welcome to Trustpoint!'), _( 'Thank you for setting up Trustpoint. ' 'This system will help you manage your certificates and secure your environment.' ), ), # Periodic task notifications self.SYSTEM_NOT_HEALTHY: NotificationMessage( _('System health check failed'), _( 'The system health check detected an issue with one or more services. ' 'Please investigate immediately.' ), ), self.VULNERABILITY: NotificationMessage( _('Security vulnerability detected'), _( 'A security vulnerability affecting system components has been detected. ' 'Immediate attention required.' ), ), self.CERT_EXPIRING: NotificationMessage( _('Certificate {common_name} is expiring soon'), _('The certificate {common_name} is set to expire on {not_valid_after}.'), ), self.CERT_EXPIRED: NotificationMessage( _('Certificate {common_name} has expired'), _('The certificate {common_name} expired on {not_valid_after}.'), ), self.ISSUING_CA_EXPIRING: NotificationMessage( _('Issuing CA {unique_name} is expiring soon'), _('The issuing CA {unique_name} is set to expire on {not_valid_after}.'), ), self.ISSUING_CA_EXPIRED: NotificationMessage( _('Issuing CA {unique_name} has expired'), _('The issuing CA {unique_name} expired on {not_valid_after}.'), ), self.DOMAIN_NO_ISSUING_CA: NotificationMessage( _('Domain {unique_name} has no Issuing CA assigned'), _('The domain {unique_name} currently has no Issuing CA assigned.'), ), self.DEVICE_NOT_ONBOARDED: NotificationMessage( _('Device {device} is not onboarded in {domain}'), _('The device {device} has not completed onboarding.'), ), self.DEVICE_ONBOARDING_FAILED: NotificationMessage( _('Device {device} onboarding failed'), _('The device {device} failed onboarding.') ), self.DEVICE_CERT_REVOKED: NotificationMessage( _('Device {device} certificate revoked'), _('The device {device} has had its certificate revoked. The device may no longer be trusted.'), ), self.WEAK_SIGNATURE_ALGORITHM: NotificationMessage( _('Certificate {common_name} uses a weak signature algorithm'), _('The certificate {common_name} is signed using {signature_algorithm}, which is considered weak.'), ), self.INSUFFICIENT_KEY_LENGTH: NotificationMessage( _('Certificate {common_name} uses insufficient key length'), _( 'The certificate {common_name} uses an RSA key size of {spki_key_size} bits, ' 'which is less than the recommended 2048 bits.' ), ), self.WEAK_ECC_CURVE: NotificationMessage( _('Certificate {common_name} uses a weak ECC curve'), _( 'The certificate {common_name} is using the {spki_ec_curve} ECC curve, ' 'which is no longer recommended.' ), ), } default = NotificationMessage( _('Unknown Notification message string.'), _('Guess we messed up. Type of this notification is %(type)s') % {'type': self}, ) return message_dict.get(self, default)
[docs] notification_type = models.CharField(max_length=3, choices=NotificationTypes, default=NotificationTypes.INFO)
[docs] notification_source = models.CharField(max_length=1, choices=NotificationSource, default=NotificationSource.SYSTEM)
[docs] message_type = models.CharField( max_length=32, choices=NotificationMessageType, default=NotificationMessageType.CUSTOM )
[docs] message_data = models.JSONField(blank=True, default=dict)
[docs] domain = models.ForeignKey( DomainModel, on_delete=models.SET_NULL, blank=True, null=True, related_name='notifications' )
[docs] certificate = models.ForeignKey( CertificateModel, on_delete=models.SET_NULL, blank=True, null=True, related_name='notifications' )
[docs] device = models.ForeignKey( DeviceModel, on_delete=models.SET_NULL, blank=True, null=True, related_name='notifications' )
[docs] issuing_ca = models.ForeignKey( IssuingCaModel, on_delete=models.SET_NULL, blank=True, null=True, related_name='notifications' )
[docs] event = models.CharField(max_length=255, blank=True, default='')
[docs] message = models.ForeignKey( # only for custom messages NotificationMessageModel, on_delete=models.CASCADE, null=True, related_name='notifications' )
[docs] statuses = models.ManyToManyField(NotificationStatus, related_name='notifications')
[docs] created_at = models.DateTimeField(auto_now_add=True, verbose_name=_('Created at'))
if TYPE_CHECKING:
[docs] def get_notification_type_display(self) -> str: """Gets the notification type as human-readable string for displaying in the front-end. Returns: Human-readable string for displaying in the front-end. """
def get_notification_source_display(self) -> str: """Gets the notification source as human-readable string for displaying in the front-end. Returns: Human-readable string for displaying in the front-end. """ def get_message_type_display(self) -> str: """Gets the message type as human-readable string for displaying in the front-end. Returns: Human-readable string for displaying in the front-end. """
[docs] class Meta(TypedModelMeta): """Meta class configuration."""
[docs] def __str__(self) -> str: """Returns a human-readable string. Returns: The notification type to display with message description if available. """ message_text = 'No message' if self.message and hasattr(self.message, 'short_description'): message_text = self.message.short_description[:20] return f'{self.get_notification_type_display()} - {message_text}'
@property
[docs] def short_translated(self) -> Any: """Returns the translated short description. Returns: The translated short description. """ if self.message_type == NotificationModel.NotificationMessageType.CUSTOM: return self.message.short_description try: message_string = NotificationModel.NotificationMessageType(self.message_type).get_message() except ValueError: return force_str(_('Unknown Notification message type.')) return message_string.short.format(**self.message_data)
@property
[docs] def long_translated(self) -> Any: """Returns the translated long description. Returns: The translated long description. """ if self.message_type == NotificationModel.NotificationMessageType.CUSTOM: return self.message.long_description try: message_string = NotificationModel.NotificationMessageType(self.message_type).get_message() except ValueError: return _('Guess we messed up. Type of this notification is %(type)s') % {'type': self.message_type} return message_string.long.format(**self.message_data)
[docs] class WeakECCCurve(models.Model): """Represents a weak or deprecated ECC curve."""
[docs] objects: models.Manager['WeakECCCurve']
[docs] class ECCCurveChoices(models.TextChoices): """Enumeration of weak or deprecated ECC curve OIDs."""
[docs] SECP160R1 = '1.3.132.0.8', _('SECP160R1')
[docs] SECP192R1 = '1.2.840.10045.3.1.1', _('SECP192R1')
[docs] SECP224R1 = '1.3.132.0.33', _('SECP224R1')
[docs] SECP256K1 = '1.3.132.0.10', _('SECP256K1')
[docs] SECT163K1 = '1.3.132.0.1', _('SECT163K1')
[docs] SECT163R2 = '1.3.132.0.15', _('SECT163R2')
[docs] SECT233K1 = '1.3.132.0.26', _('SECT233K1')
[docs] SECT233R1 = '1.3.132.0.27', _('SECT233R1')
[docs] SECT283K1 = '1.3.132.0.16', _('SECT283K1')
[docs] oid = models.CharField( max_length=64, choices=ECCCurveChoices.choices, unique=True )
[docs] def __str__(self) -> str: """Return the human-readable name for the ECC curve.""" return str(dict(self.ECCCurveChoices.choices).get(self.oid, self.oid)) # type: ignore[misc]
[docs] class WeakSignatureAlgorithm(models.Model): """Represents a weak or deprecated signature algorithm."""
[docs] objects: models.Manager[WeakSignatureAlgorithm]
[docs] class SignatureChoices(models.TextChoices): """Enumeration of weak or deprecated signature algorithm OIDs."""
[docs] MD5 = '1.2.840.113549.2.5', _('MD5')
[docs] SHA1 = '1.3.14.3.2.26', _('SHA-1')
[docs] SHA224 = '2.16.840.1.101.3.4.2.4', _('SHA-224')
[docs] oid = models.CharField( max_length=64, choices=SignatureChoices.choices, unique=True )
[docs] def __str__(self) -> str: """Return the human-readable name for the weak signature algorithm.""" return str(dict(self.SignatureChoices.choices).get(self.oid, self.oid)) # type: ignore[misc]
[docs] class NotificationConfig(models.Model): """Stores global configuration for notification thresholds and behaviors."""
[docs] objects: models.Manager[NotificationConfig]
[docs] cert_expiry_warning_days = models.PositiveIntegerField( default=30, help_text=_("Number of days before a certificate's expiration to trigger a 'Certificate Expiring' warning.") )
[docs] issuing_ca_expiry_warning_days = models.PositiveIntegerField( default=30, help_text=_("Number of days before an issuing CA's certificate expiration to trigger a warning.") )
[docs] rsa_minimum_key_size = models.PositiveIntegerField( default=2048, help_text=_('Minimum RSA key size (in bits) that certificates must meet to avoid being flagged as insecure.') )
[docs] weak_ecc_curves = models.ManyToManyField( WeakECCCurve, blank=True, help_text=_('Select ECC curves considered weak or deprecated.') )
[docs] weak_signature_algorithms = models.ManyToManyField( WeakSignatureAlgorithm, blank=True, help_text=_('Select signature algorithms considered weak or deprecated.') ) # type: ignore
[docs] class Meta: """Meta class configuration."""
[docs] verbose_name = _('Notification Configuration')
[docs] def __str__(self) -> str: """Return the human-readable name for the notification configuration.""" return 'Notification Settings'
@classmethod
[docs] def get(cls) -> NotificationConfig: """Ensure there's always one settings object to use.""" return cls.objects.first() or cls.objects.create()