"""Views for managing Domains."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
from cryptography.hazmat.primitives import serialization
from devices.views import ActiveTrustpointTlsServerCredentialModelMissingErrorMsg, NamedCurveMissingForEccErrorMsg
from django.contrib import messages
from django.core.exceptions import ValidationError
from django.db.models import ProtectedError
from django.forms import BaseModelForm
from django.http import Http404, HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.urls import reverse, reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.views.generic import DeleteView
from django.views.generic.detail import DetailView
from django.views.generic.edit import CreateView, FormView, UpdateView
from django.views.generic.list import ListView
from management.models import TlsSettings
from trustpoint_core import oid
from pki.forms import DevIdAddMethodSelectForm, DevIdRegistrationForm
from pki.models import CertificateModel, DevIdRegistration, DomainModel, IssuingCaModel
from pki.models.truststore import ActiveTrustpointTlsServerCredentialModel, TruststoreModel
from trustpoint.settings import UIConfig
from trustpoint.views.base import (
BulkDeleteView,
ContextDataMixin,
ListInDetailView,
SortableTableMixin,
)
if TYPE_CHECKING:
from django.db.models import QuerySet
from django.forms import Form
from django.http import HttpRequest
[docs]
class DomainContextMixin(ContextDataMixin):
"""Mixin which adds context_data for the PKI -> Issuing CAs pages."""
[docs]
context_page_category = 'pki'
[docs]
context_page_name = 'domains'
[docs]
class DomainTableView(DomainContextMixin, SortableTableMixin, ListView[DomainModel]):
"""Domain Table View."""
[docs]
model = DomainModel
[docs]
template_name = 'pki/domains/domain.html' # Template file
[docs]
context_object_name = 'domain-new'
[docs]
paginate_by = UIConfig.paginate_by
[docs]
default_sort_param = 'unique_name'
[docs]
class DomainCreateView(DomainContextMixin, CreateView[DomainModel, BaseModelForm[DomainModel]]):
"""View to create a new domain."""
[docs]
model = DomainModel
[docs]
fields = '__all__'
[docs]
template_name = 'pki/domains/add.html'
[docs]
success_url = reverse_lazy('pki:domains')
[docs]
ignore_url = reverse_lazy('pki:domains')
[docs]
def get_form(self, _form_class: Any = None) -> Any:
"""Override get_form to filter out autogen root CAs."""
form = super().get_form()
# Filter out autogen root CAs
form.fields['issuing_ca'].queryset = IssuingCaModel.objects.exclude(
issuing_ca_type=IssuingCaModel.IssuingCaTypeChoice.AUTOGEN_ROOT
).filter(is_active=True)
form.fields['issuing_ca'].empty_label = None # Remove empty "---------" choice
del form.fields['is_active']
return form
[docs]
def form_valid(self, form: BaseModelForm[DomainModel]) -> HttpResponse:
"""Handle the case where the form is valid."""
domain = form.save()
messages.success(
self.request,
_('Successfully created domain {name}.').format(name=domain.unique_name),
)
return super().form_valid(form)
[docs]
class DomainUpdateView(DomainContextMixin, UpdateView[DomainModel]):
"""View to edit a domain."""
# TODO(Air): This view is currently UNUSED. # noqa: FIX002
# If used, a mixin implementing the get_form method from the DomainCreateView should be added.
[docs]
model = DomainModel
[docs]
fields = '__all__'
[docs]
template_name = 'pki/domains/add.html'
[docs]
success_url = reverse_lazy('pki:domains')
[docs]
ignore_url = reverse_lazy('pki:domains')
[docs]
class DomainDevIdRegistrationTableMixin(SortableTableMixin, ListInDetailView):
"""Mixin to add a table of DevID Registrations to the domain config view."""
[docs]
model = DevIdRegistration
[docs]
paginate_by = UIConfig.paginate_by
[docs]
context_object_name = 'devid_registrations'
[docs]
default_sort_param = 'unique_name'
[docs]
def get_queryset(self) -> QuerySet[DevIdRegistration]:
"""Gets the queryset for the DevID Registration table."""
self.queryset = DevIdRegistration.objects.filter(domain=self.get_object())
return super().get_queryset()
[docs]
class DomainConfigView(DomainContextMixin, DomainDevIdRegistrationTableMixin, ListInDetailView):
"""View to configure a domain, allows adding DevID registration patterns."""
[docs]
detail_model = DomainModel
[docs]
template_name = 'pki/domains/config.html'
[docs]
detail_context_object_name = 'domain'
[docs]
success_url = reverse_lazy('pki:domains')
[docs]
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Adds (no) additional context data."""
context = super().get_context_data(**kwargs)
domain: DomainModel = cast(DomainModel, self.get_object())
issued_credentials = domain.issued_credentials.all()
certificates = CertificateModel.objects.filter(
credential__in=[issued_credential.credential for issued_credential in issued_credentials]
)
context['certificates'] = certificates
context['domain_options'] = {}
context['domain_help_texts'] = {}
context['domain_verbose_name'] = {}
return context
[docs]
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Handle config form submission."""
del args
del kwargs
domain = self.get_object()
domain.save()
messages.success(request, _('Settings updated successfully.'))
return HttpResponseRedirect(self.success_url)
[docs]
class DomainDetailView(DomainContextMixin, DomainDevIdRegistrationTableMixin, ListInDetailView):
"""View to display domain details."""
[docs]
detail_model = DomainModel
[docs]
template_name = 'pki/domains/details.html'
[docs]
detail_context_object_name = 'domain'
[docs]
class DomainCaBulkDeleteConfirmView(DomainContextMixin, BulkDeleteView):
"""View to confirm the deletion of multiple Domains."""
[docs]
model = DomainModel
[docs]
success_url = reverse_lazy('pki:domains')
[docs]
ignore_url = reverse_lazy('pki:domains')
[docs]
template_name = 'pki/domains/confirm_delete.html'
[docs]
context_object_name = 'domains'
[docs]
def form_valid(self, form: Form) -> HttpResponse:
"""Attempt to delete domains if the form is valid."""
queryset = self.get_queryset()
deleted_count = queryset.count()
try:
response = super().form_valid(form)
except ProtectedError:
messages.error(
self.request, _('Cannot delete the selected Domain(s) because they are referenced by other objects.')
)
return HttpResponseRedirect(self.success_url)
except ValidationError as exc:
messages.error(self.request, exc.message)
return HttpResponseRedirect(self.success_url)
messages.success(self.request, _('Successfully deleted {count} Domains.').format(count=deleted_count))
return response
[docs]
class DevIdRegistrationCreateView(DomainContextMixin, FormView[DevIdRegistrationForm]):
"""View to create a new DevID Registration."""
[docs]
http_method_names = ('get', 'post')
[docs]
template_name = 'pki/devid_registration/add.html'
[docs]
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Add additional context data."""
context = super().get_context_data(**kwargs)
context['domain'] = self.get_domain()
truststore_id = self.kwargs.get('truststore_id')
if truststore_id:
context['truststore'] = self.get_truststore(truststore_id)
else:
context['truststore'] = None
return context
[docs]
def get_initial(self) -> dict[str, Any]:
"""Initialize the form with default values."""
initial = super().get_initial()
domain = self.get_domain()
initial['domain'] = domain
truststore_id = self.kwargs.get('truststore_id')
if truststore_id:
initial['truststore'] = self.get_truststore(truststore_id)
else:
initial['truststore'] = None
return initial
[docs]
def get_domain(self) -> DomainModel:
"""Fetch the domain based on the primary key passed in the URL."""
try:
pk = self.kwargs.get('pk')
return DomainModel.objects.get(pk=pk)
except DomainModel.DoesNotExist as e:
exc_msg = 'This Domain does not exist.'
raise Http404(exc_msg) from e
[docs]
def get_truststore(self, truststore_id: int) -> TruststoreModel:
"""Fetch the domain based on the primary key passed in the URL."""
try:
return TruststoreModel.objects.get(pk=truststore_id)
except TruststoreModel.DoesNotExist as e:
exc_msg = 'This Domain does not exist.'
raise Http404(exc_msg) from e
[docs]
def get_success_url(self) -> str:
"""Return the URL to redirect to upon successful form submission."""
domain = self.get_domain()
return cast('str', reverse_lazy('pki:domains-config', kwargs={'pk': domain.id}))
[docs]
class DevIdRegistrationDeleteView(DomainContextMixin, DeleteView):
"""View to delete a DevID Registration."""
[docs]
model = DevIdRegistration
[docs]
template_name = 'pki/devid_registration/confirm_delete.html'
[docs]
success_url = reverse_lazy('pki:domains')
[docs]
def delete(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Override delete method to add a success message."""
response = super().delete(request, *args, **kwargs)
messages.success(request, _('DevID Registration Pattern deleted successfully.'))
return response
[docs]
class DevIdMethodSelectView(DomainContextMixin, FormView):
"""View to select the method to add a DevID Registration pattern."""
[docs]
template_name = 'pki/devid_registration/method_select.html'
[docs]
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Add additional context data."""
context = super().get_context_data(**kwargs)
context['domain'] = get_object_or_404(DomainModel, id=self.kwargs.get('pk'))
return context
[docs]
class IssuedCertificatesView(ContextDataMixin, ListView[CertificateModel]):
"""View to list certificates issued by a specific Issuing CA for a Domain."""
[docs]
model = CertificateModel
[docs]
template_name = 'pki/domains/issued_certificates.html'
[docs]
context_object_name = 'issued_certificates'
[docs]
def get_queryset(self) -> QuerySet[CertificateModel]:
"""Return only certificates associated with the domain's issued credentials."""
domain: DomainModel = self.get_domain()
# PyCharm TypeChecker issue - this passes mypy
# noinspection PyTypeChecker
# TODO(AlexHx8472): This must be limited to the actual domain.
return CertificateModel.objects.filter(
issuer_public_bytes=domain.issuing_ca.credential.certificate.subject_public_bytes
)
[docs]
def get_domain(self) -> DomainModel:
"""Get the domain object based on the URL parameter."""
domain_id = self.kwargs.get('pk')
return DomainModel.objects.get(pk=domain_id)
[docs]
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Pass additional context data to the template."""
context = super().get_context_data(**kwargs)
domain = self.get_domain()
context['domain'] = domain
return context
[docs]
class OnboardingMethodSelectIdevidHelpView(DomainContextMixin, DetailView[DevIdRegistration]):
"""View to select the protocol for IDevID enrollment."""
[docs]
template_name = 'help/idevid_method_select.html'
[docs]
context_object_name = 'devid_registration'
[docs]
model = DevIdRegistration
[docs]
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Add the required context for the template."""
context = super().get_context_data(**kwargs)
context['pk'] = self.object.pk
return context
[docs]
class OnboardingIdevidRegistrationHelpView(DomainContextMixin, DetailView[DevIdRegistration]):
"""Help view for the IDevID Registration, which displays the required OpenSSL commands."""
[docs]
http_method_names = ('get',)
[docs]
model = DevIdRegistration
[docs]
context_object_name = 'devid_registration'
[docs]
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
"""Adds information about the required OpenSSL commands to the context.
Args:
**kwargs: Keyword arguments passed to super().get_context_data.
Returns:
The context to render the page.
"""
context = super().get_context_data(**kwargs)
context['pk'] = self.kwargs.get('pk')
devid_registration: DevIdRegistration = self.object
if devid_registration.domain.public_key_info.public_key_algorithm_oid == oid.PublicKeyAlgorithmOid.RSA:
domain_credential_key_gen_command = (
f'openssl genrsa -out domain_credential_key.pem {devid_registration.domain.public_key_info.key_size}'
)
key_gen_command = f'openssl genrsa -out key.pem {devid_registration.domain.public_key_info.key_size}'
elif devid_registration.domain.public_key_info.public_key_algorithm_oid == oid.PublicKeyAlgorithmOid.ECC:
if not devid_registration.domain.public_key_info.named_curve:
raise Http404(NamedCurveMissingForEccErrorMsg)
domain_credential_key_gen_command = (
f'openssl ecparam -name {devid_registration.domain.public_key_info.named_curve.ossl_curve_name} '
f'-genkey -noout -out domain_credential_key.pem'
)
key_gen_command = (
f'openssl ecparam -name {devid_registration.domain.public_key_info.named_curve.ossl_curve_name} '
f'-genkey -noout -out key.pem'
)
else:
err_msg = 'Unsupported public key algorithm'
raise ValueError(err_msg)
ipv4_address = TlsSettings.get_first_ipv4_address()
context['host'] = f'{ipv4_address}:{self.request.META.get("SERVER_PORT", "443")}'
context['domain_credential_key_gen_command'] = domain_credential_key_gen_command
context['key_gen_command'] = key_gen_command
context['issuing_ca_pem'] = (
devid_registration.domain.get_issuing_ca_or_value_error().credential.get_certificate()
.public_bytes(encoding=serialization.Encoding.PEM)
.decode()
)
tls_cert = ActiveTrustpointTlsServerCredentialModel.objects.first()
if not tls_cert or not tls_cert.credential:
raise Http404(ActiveTrustpointTlsServerCredentialModelMissingErrorMsg)
context['trustpoint_server_certificate'] = (
tls_cert.credential.certificate.get_certificate_serializer().as_pem().decode('utf-8')
)
context['public_key_info'] = devid_registration.domain.public_key_info
context['domain'] = devid_registration.domain
return context
[docs]
class OnboardingCmpIdevidRegistrationHelpView(OnboardingIdevidRegistrationHelpView):
"""Help view for the CMP IDevID Registration, which displays the required OpenSSL commands."""
[docs]
template_name = 'help/cmp_idevid.html'
[docs]
class OnboardingEstIdevidRegistrationHelpView(OnboardingIdevidRegistrationHelpView):
"""Help view for the EST IDevID Registration, which displays the required OpenSSL commands."""
[docs]
template_name = 'help/est_idevid.html'