Source code for devices.views

"""This module contains all views concerning the devices application."""
from __future__ import annotations

import abc
import datetime
import io
from typing import TYPE_CHECKING, Any, cast

from django.contrib import messages
from django.contrib.auth.decorators import login_not_required
from django.core.paginator import Paginator
from django.db.models import Q, QuerySet
from django.http import FileResponse, Http404, HttpResponse, HttpResponseBase, HttpResponseRedirect
from django.shortcuts import redirect
from django.urls import reverse, reverse_lazy
from django.utils.decorators import method_decorator
from django.utils.html import format_html
from django.utils.translation import gettext_lazy, ngettext
from django.views.generic.base import RedirectView, TemplateView
from django.views.generic.detail import DetailView
from django.views.generic.edit import FormView
from django.views.generic.list import ListView
from pki.models.certificate import CertificateModel
from pki.models.credential import CredentialModel
from trustpoint_core.archiver import Archiver
from trustpoint_core.serializer import CredentialFileFormat
from util.mult_obj_views import get_primary_keys_from_str_as_list_of_ints

from devices.filters import DeviceFilter
from devices.forms import (
    BrowserLoginForm,
    ClmDeviceModelNoOnboardingForm,
    ClmDeviceModelOnboardingForm,
    CredentialDownloadForm,
    DeleteDevicesForm,
    IssueDomainCredentialForm,
    IssueOpcUaClientCredentialForm,
    IssueOpcUaServerCredentialForm,
    IssueTlsClientCredentialForm,
    IssueTlsServerCredentialForm,
    NoOnboardingCreateForm,
    OnboardingCreateForm,
    RevokeDevicesForm,
    RevokeIssuedCredentialForm,
)
from devices.issuer import (
    LocalDomainCredentialIssuer,
    LocalTlsClientCredentialIssuer,
    LocalTlsServerCredentialIssuer,
    OpcUaClientCredentialIssuer,
    OpcUaServerCredentialIssuer,
)
from devices.models import (
    DeviceModel,
    IssuedCredentialModel,
    NoOnboardingPkiProtocol,
    OnboardingPkiProtocol,
    OnboardingProtocol,
    OnboardingStatus,
    RemoteDeviceCredentialDownloadModel,
)
from devices.revocation import DeviceCredentialRevocation
from trustpoint.logger import LoggerMixin
from trustpoint.page_context import (
    DEVICES_PAGE_CATEGORY,
    DEVICES_PAGE_DEVICES_SUBCATEGORY,
    DEVICES_PAGE_OPC_UA_SUBCATEGORY,
    PageContextMixin,
)
from trustpoint.settings import UIConfig

if TYPE_CHECKING:
    import ipaddress
    from collections.abc import Sequence

    from django.http.request import HttpRequest
    from django.utils.safestring import SafeString

    # noinspection PyUnresolvedReferences
    from devices.forms import BaseCredentialForm

    # noinspection PyUnresolvedReferences
    from devices.issuer import BaseTlsCredentialIssuer

[docs] DeviceWithoutDomainErrorMsg = gettext_lazy('Device does not have an associated domain.')
[docs] NamedCurveMissingForEccErrorMsg = gettext_lazy('Failed to retrieve named curve for ECC algorithm.')
[docs] ActiveTrustpointTlsServerCredentialModelMissingErrorMsg = gettext_lazy( 'No active trustpoint TLS server credential found.' )
# This only occurs if no domain is configured
[docs] PublicKeyInfoMissingErrorMsg = DeviceWithoutDomainErrorMsg
# This must be removed in the future makeing use of the profile engine
[docs] ALLOWED_APP_CRED_PROFILES = [ {'profile': 'tls-server', 'label': 'TLS-Server Certficate'}, {'profile': 'tls-client', 'label': 'TLS-Client Certificate'}, {'profile': 'opc-ua-server', 'label': 'OPC-UA-Server Certificate'}, {'profile': 'opc-ua-client', 'label': 'OPC-UA-Client Certificate'}, ]
# -------------------------------------------------- Main Table Views --------------------------------------------------
[docs] class AbstractDeviceTableView(PageContextMixin, ListView[DeviceModel], abc.ABC): """Device Table View."""
[docs] http_method_names = ('get',)
[docs] model = DeviceModel
[docs] context_object_name = 'devices'
[docs] paginate_by = UIConfig.paginate_by
[docs] default_sort_param = 'common_name'
[docs] filterset_class = DeviceFilter
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def apply_filters(self, qs: QuerySet[DeviceModel]) -> QuerySet[DeviceModel]: """Applies the `DeviceFilter` to the given queryset. Args: qs: The base queryset to filter. Returns: The filtered queryset according to GET parameters. """ self.filterset = DeviceFilter(self.request.GET, queryset=qs) return cast('QuerySet[DeviceModel]', self.filterset.qs)
[docs] def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: """Adds the object model to the instance and forwards to super().get(). Args: request: The Django request object. *args: Positional arguments passed to super().get(). **kwargs: Keyword arguments passed to super().get(). Returns: The HttpResponse object returned by super().get(). """ sort_params = request.GET.getlist('sort', [self.default_sort_param]) if len(sort_params) > 1: first_sort_parameter = sort_params[0] query_dict = request.GET.copy() query_dict.setlist('sort', [first_sort_parameter]) new_url = f'{request.path}?{query_dict.urlencode()}' return HttpResponseRedirect(new_url) self.ordering = sort_params[0] return super().get(request, *args, **kwargs)
@abc.abstractmethod
[docs] def get_queryset(self) -> QuerySet[DeviceModel]: """Filter queryset to only include devices which are of generic type. Returns: Returns a queryset of all DeviceModels which are of generic type. """ ...
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the clm and revoke buttons to the context. Args: **kwargs: Keyword arguments passed to super().get_context_data. Returns: The context to use for rendering the devices page. """ context = super().get_context_data(**kwargs) sort_param = self.request.GET.get('sort', self.default_sort_param) context['current_sort'] = sort_param context['filter'] = getattr(self, 'filterset', None) params = self.request.GET.copy() params.pop('sort', None) context['preserve_qs'] = params.urlencode() for device in context['devices']: device.clm_button = self._get_clm_button_html(device) device.pki_protocols = self._get_pki_protocols(device) context['create_url'] = f'{self.page_category}:{self.page_name}_create' context['device_revoke_url'] = reverse(f'{self.page_category}:{self.page_name}_device_revoke') context['device_delete_url'] = reverse(f'{self.page_category}:{self.page_name}_device_delete') return context
[docs] def get_ordering(self) -> str | Sequence[str] | None: """Returns the sort parameters as a list. Returns: The sort parameters, if any. Otherwise the default sort parameter. """ return self.request.GET.getlist('sort', [self.default_sort_param])
[docs] def _get_clm_button_html(self, record: DeviceModel) -> SafeString: """Gets the HTML for the CLM button in the devices table. Args: record: The corresponding DeviceModel. Returns: The HTML of the hyperlink for the CLM button. """ clm_url = reverse( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management', kwargs={'pk': record.pk} ) return format_html( '<a href="{}" class="btn btn-primary tp-table-btn w-100">{}</a>', clm_url, gettext_lazy('Manage') )
[docs] def _get_pki_protocols(self, record: DeviceModel) -> str: if record.onboarding_config: return ', '.join([str(p.label) for p in record.onboarding_config.get_pki_protocols()]) if record.no_onboarding_config: return ', '.join([str(p.label) for p in record.no_onboarding_config.get_pki_protocols()]) return ''
[docs] class DeviceTableView(AbstractDeviceTableView): """Device Table View."""
[docs] template_name = 'devices/devices.html'
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] def get_queryset(self) -> QuerySet[DeviceModel]: """Filter queryset to only include devices which are of generic type and filtered by filtered by UI filters. Returns: Returns a queryset of all DeviceModels, filtered by UI filters. """ base_qs = super(ListView, self).get_queryset().filter( device_type=DeviceModel.DeviceType.GENERIC_DEVICE ) return self.apply_filters(base_qs)
[docs] class OpcUaGdsTableView(DeviceTableView): """Table View for devices where opc_ua_gds is True."""
[docs] template_name = 'devices/opc_ua_gds.html'
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] def get_queryset(self) -> QuerySet[DeviceModel]: """Filter queryset to only include devices which are of OPC-UA GDS type and filtered by UI filters. Returns: Returns a queryset of all DeviceModels which are of OPC-UA GDS type, filtered by UI filters. """ base_qs = super(ListView, self).get_queryset().filter( device_type=DeviceModel.DeviceType.OPC_UA_GDS ) return self.apply_filters(base_qs)
# ------------------------------------------------- Device Create View -------------------------------------------------
[docs] class AbstractCreateChooseOnboaringView(PageContextMixin, TemplateView): """Abstract view for choosing if the new device shall be onboarded or not."""
[docs] http_method_names = ('get',)
[docs] template_name = 'devices/create_choose_onboarding.html'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the cancel url href according to the subcategory. Args: **kwargs: Keyword arguments passed to super().get_context_data. Returns: The context to use for rendering the devices page. """ context = super().get_context_data(**kwargs) context['cancel_create_url'] = f'devices:{self.page_name}' context['use_onboarding_url'] = f'{self.page_category}:{self.page_name}_create_onboarding' context['use_no_onboarding_url'] = f'{self.page_category}:{self.page_name}_create_no_onboarding' return context
[docs] class DeviceCreateChooseOnboardingView(AbstractCreateChooseOnboaringView): """View for choosing if the new device shall be onboarded or not."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsCreateChooseOnboardingView(AbstractCreateChooseOnboaringView): """View for choosing if the new OPC UA GDS shall be onboarded or not."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractCreateNoOnboardingView(PageContextMixin, FormView[NoOnboardingCreateForm]): """asdfds."""
[docs] http_method_names = ('get', 'post')
[docs] form_class = NoOnboardingCreateForm
[docs] template_name = 'devices/create.html'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the cancel url href according to the subcategory. Args: **kwargs: Keyword arguments passed to super().get_context_data. Returns: The context to use for rendering the devices page. """ context = super().get_context_data(**kwargs) context['cancel_create_url'] = f'{self.page_category}:{self.page_name}' return context
[docs] def form_valid(self, form: NoOnboardingCreateForm) -> HttpResponse: """Saves the form / creates the device model object. Args: form: The valid form. Returns: The HTTP Response to be returned. """ if self.page_name == DEVICES_PAGE_DEVICES_SUBCATEGORY: self.object = form.save(device_type=DeviceModel.DeviceType.GENERIC_DEVICE) else: self.object = form.save(device_type=DeviceModel.DeviceType.OPC_UA_GDS) return super().form_valid(form)
[docs] def get_success_url(self) -> str: """Gets the success url to redirect to after successful processing of the POST data following a form submit. Returns: The success url to redirect to after successful processing of the POST data following a form submit. """ return str( reverse_lazy( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management', kwargs={'pk': self.object.id} ) )
[docs] class DeviceCreateNoOnboardingView(AbstractCreateNoOnboardingView): """Create form view for the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsCreateNoOnboardingView(AbstractCreateNoOnboardingView): """Create form view for the devices section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractCreateOnboardingView(PageContextMixin, FormView[OnboardingCreateForm]): """asdfds."""
[docs] http_method_names = ('get', 'post')
[docs] form_class = OnboardingCreateForm
[docs] template_name = 'devices/create.html'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the cancel url href according to the subcategory. Args: **kwargs: Keyword arguments passed to super().get_context_data. Returns: The context to use for rendering the devices page. """ context = super().get_context_data(**kwargs) context['cancel_create_url'] = f'{self.page_category}:{self.page_name}' return context
[docs] def form_valid(self, form: OnboardingCreateForm) -> HttpResponse: """Saves the form / creates the device model object. Args: form: The valid form. Returns: The HTTP Response to be returned. """ if self.page_name == DEVICES_PAGE_DEVICES_SUBCATEGORY: self.object = form.save(device_type=DeviceModel.DeviceType.GENERIC_DEVICE) else: self.object = form.save(device_type=DeviceModel.DeviceType.OPC_UA_GDS) return super().form_valid(form)
[docs] def get_success_url(self) -> str: """Gets the success url to redirect to after successful processing of the POST data following a form submit. Returns: The success url to redirect to after successful processing of the POST data following a form submit. """ return str( reverse_lazy( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management', kwargs={'pk': self.object.id} ) )
[docs] class DeviceCreateOnboardingView(AbstractCreateOnboardingView): """Create form view for the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsCreateOnboardingView(AbstractCreateOnboardingView): """Create form view for the devices section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
# ------------------------------------------ Certificate Lifecycle Management ------------------------------------------
[docs] class AbstractCertificateLifecycleManagementSummaryView(PageContextMixin, DetailView[DeviceModel], abc.ABC): """This is the CLM summary view in the devices section."""
[docs] http_method_names = ('get', 'post')
[docs] model = DeviceModel
[docs] template_name = 'devices/credentials/certificate_lifecycle_management.html'
[docs] context_object_name = 'device'
[docs] default_sort_param = 'common_name'
[docs] issued_creds_qs: QuerySet[IssuedCredentialModel]
[docs] domain_credentials_qs: QuerySet[IssuedCredentialModel]
[docs] application_credentials_qs: QuerySet[IssuedCredentialModel]
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_issued_creds_qs(self) -> QuerySet[IssuedCredentialModel]: """Gets a sorted queryset of all IssuedCredentialModels. Returns: Sorted queryset of all IssuedCredentialModels. """ issued_creds_qs = IssuedCredentialModel.objects.all() sort_param = self.request.GET.get('sort', self.default_sort_param) return issued_creds_qs.order_by(sort_param)
[docs] def get_domain_credentials_qs(self) -> QuerySet[IssuedCredentialModel]: """Gets a sorted queryset of all IssuedCredentialModels that are domain credentials. self.get_issued_creds_qs() must be called first! Returns: Sorted queryset of all IssuedCredentialModels that are domain credentials """ return self.issued_creds_qs.filter( Q(device=self.object) & Q(issued_credential_type=IssuedCredentialModel.IssuedCredentialType.DOMAIN_CREDENTIAL.value) )
[docs] def get_application_credentials_qs(self) -> QuerySet[IssuedCredentialModel]: """Gets a sorted queryset of all IssuedCredentialModels that are application credentials. self.get_issued_creds_qs() must be called first! Returns: Sorted queryset of all IssuedCredentialModels that are application credentials. """ return self.issued_creds_qs.filter( Q(device=self.object) & Q(issued_credential_type=IssuedCredentialModel.IssuedCredentialType.APPLICATION_CREDENTIAL.value) )
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the paginator and credential details to the context. Args: **kwargs: Keyword arguments passed to super().get_context_data. Returns: The context to use for rendering the clm summary page. """ self.issued_creds_qs = self.get_issued_creds_qs() self.domain_credentials_qs = self.get_domain_credentials_qs() self.application_credentials_qs = self.get_application_credentials_qs() context = super().get_context_data(**kwargs) context['domain_credentials'] = self.domain_credentials_qs context['application_credentials'] = self.application_credentials_qs paginator_domain = Paginator(self.domain_credentials_qs, UIConfig.paginate_by) page_number_domain = self.request.GET.get('page', 1) context['domain_credentials'] = paginator_domain.get_page(page_number_domain) context['is_paginated'] = paginator_domain.num_pages > 1 paginator_application = Paginator(self.application_credentials_qs, UIConfig.paginate_by) page_number_application = self.request.GET.get('page-a', 1) context['application_credentials'] = paginator_application.get_page(page_number_application) context['is_paginated_a'] = paginator_application.num_pages > 1 for cred in context['domain_credentials']: cred.expires_in = self._get_expires_in(cred) cred.expiration_date = cast('datetime.datetime', cred.credential.certificate.not_valid_after) cred.revoke = self._get_revoke_button_html(cred) for cred in context['application_credentials']: cred.expires_in = self._get_expires_in(cred) cred.expiration_date = cast('datetime.datetime', cred.credential.certificate.not_valid_after) cred.revoke = self._get_revoke_button_html(cred) context['main_url'] = f'{self.page_category}:{self.page_name}' context['issue_app_cred_no_onboarding_url'] = '' if ( self.object.domain and self.object.no_onboarding_config and self.object.no_onboarding_config.get_pki_protocols() ): context['issue_app_cred_no_onboarding_url'] = ( f'{self.page_category}:{self.page_name}_no_onboarding_clm_issue_application_credential' ) issue_domain_cred_onboarding_url = '' if self.object.onboarding_config: if self.object.onboarding_config.onboarding_protocol == OnboardingProtocol.CMP_SHARED_SECRET: issue_domain_cred_onboarding_url = ( f'{self.page_category}:{self.page_name}' '_certificate_lifecycle_management_issue_domain_credential_cmp_shared_secret' ) elif self.object.onboarding_config.onboarding_protocol == OnboardingProtocol.EST_USERNAME_PASSWORD: issue_domain_cred_onboarding_url = ( f'{self.page_category}:{self.page_name}' '_certificate_lifecycle_management_issue_domain_credential_est_username_password' ) context['issue_app_cred_onboarding_url'] = '' if self.object.domain and self.object.onboarding_config and self.object.onboarding_config.get_pki_protocols(): context['issue_app_cred_onboarding_url'] = ( f'{self.page_category}:{self.page_name}_onboarding_clm_issue_application_credential' ) context['issue_domain_cred_onboarding_url'] = issue_domain_cred_onboarding_url context['download_url'] = f'{self.page_category}:{self.page_name}_download' context['help_dispatch_domain_url'] = f'{self.page_category}:{self.page_name}_help_dispatch_domain' context['help_dispatch_device_type_url'] = f'{self.page_category}:{self.page_name}_help_dispatch_domain' context['pki_protocols'] = self._get_pki_protocols(self.object) context['OnboardingProtocol'] = OnboardingProtocol context['OnboardingPkiProtocol'] = OnboardingPkiProtocol context['NoOnboardingPkiProtocol'] = NoOnboardingPkiProtocol context['OnboardingStatus'] = OnboardingStatus context['device_form'] = self.get_device_form() return context
[docs] def get_onboarding_initial(self) -> dict[str, Any]: """Gets the initial values for onboarding. Returns: Initial values for onboarding. """ if not self.object.onboarding_config: err_msg = gettext_lazy('The device does not have onboarding configured.') raise ValueError(err_msg) return { 'common_name': self.object.common_name, 'serial_number': self.object.serial_number, 'domain': self.object.domain, 'onboarding_protocol': self.object.onboarding_config.onboarding_protocol, 'onboarding_status': OnboardingStatus(self.object.onboarding_config.onboarding_status).label, 'pki_protocol_cmp': self.object.onboarding_config.has_pki_protocol(OnboardingPkiProtocol.CMP), 'pki_protocol_est': self.object.onboarding_config.has_pki_protocol(OnboardingPkiProtocol.EST), }
[docs] def get_no_onboarding_initial(self) -> dict[str, Any]: """Gets the initial values for no onboarding. Returns: Initial values for no onboarding. """ if not self.object.no_onboarding_config: err_msg = gettext_lazy('The object has onboarding configured.') raise ValueError(err_msg) return { 'common_name': self.object.common_name, 'serial_number': self.object.serial_number, 'domain': self.object.domain, 'pki_protocol_cmp': self.object.no_onboarding_config.has_pki_protocol( NoOnboardingPkiProtocol.CMP_SHARED_SECRET ), 'pki_protocol_est': self.object.no_onboarding_config.has_pki_protocol( NoOnboardingPkiProtocol.EST_USERNAME_PASSWORD ), 'pki_protocol_manual': self.object.no_onboarding_config.has_pki_protocol(NoOnboardingPkiProtocol.MANUAL), }
[docs] def get_onboarding_form(self) -> ClmDeviceModelOnboardingForm: """Gets the form for onboarding. Returns: The onboarding form. """ return ClmDeviceModelOnboardingForm(initial=self.get_onboarding_initial(), instance=self.object)
[docs] def get_no_onboarding_form(self) -> ClmDeviceModelNoOnboardingForm: """Gets the form for no onboarding. Returns: The no onboarding form. """ if self.request.method == 'POST': return ClmDeviceModelNoOnboardingForm(self.request.POST, instance=self.object) return ClmDeviceModelNoOnboardingForm(initial=self.get_no_onboarding_initial(), instance=self.object)
[docs] def get_device_form(self) -> ClmDeviceModelOnboardingForm | ClmDeviceModelNoOnboardingForm: """Gets the device Form for onboarding or no onboarding. Returns: The required form. """ if self.object.onboarding_config: return self.get_onboarding_form() return self.get_no_onboarding_form()
@staticmethod
[docs] def _get_expires_in(record: IssuedCredentialModel) -> str: """Gets the remaining time until the credential expires as human-readable string. Args: record: The corresponding IssuedCredentialModel. Returns: The remaining time until the credential expires as human-readable string. """ if record.credential.certificate.certificate_status != CertificateModel.CertificateStatus.OK: return str(record.credential.certificate.certificate_status.label) now = datetime.datetime.now(datetime.UTC) expire_timedelta = record.credential.certificate.not_valid_after - now days = expire_timedelta.days hours, remainder = divmod(expire_timedelta.seconds, 3600) minutes, seconds = divmod(remainder, 60) return f'{days} days, {hours}:{minutes:02d}:{seconds:02d}'
[docs] def _get_revoke_button_html(self, record: IssuedCredentialModel) -> str: """Gets the HTML for the revoke button in the devices table. Args: record: The corresponding DeviceModel. Returns: The HTML of the hyperlink for the revoke button. """ if record.credential.certificate.certificate_status == CertificateModel.CertificateStatus.REVOKED: return format_html('<a class="btn btn-danger tp-table-btn w-100 disabled">{}</a>', gettext_lazy('Revoked')) url = reverse(f'{self.page_category}:{self.page_name}_credential_revoke', kwargs={'pk': record.pk}) return format_html('<a href="{}" class="btn btn-danger tp-table-btn w-100">{}</a>', url, gettext_lazy('Revoke'))
[docs] def _get_pki_protocols(self, record: DeviceModel) -> str: if record.onboarding_config: return ', '.join([str(p.label) for p in record.onboarding_config.get_pki_protocols()]) if record.no_onboarding_config: return ', '.join([str(p.label) for p in record.no_onboarding_config.get_pki_protocols()]) return ''
[docs] def post(self, request: HttpRequest, *_args: Any, **kwargs: Any) -> HttpResponse: """Handles the POST request used for device form submission. Args: request: The django request object. _args: Positional arguments are discarded. kwargs: Keyword arguments are passed to get_context_data. Returns: The HttpResponse. """ self.object = self.get_object() form: ClmDeviceModelOnboardingForm | ClmDeviceModelNoOnboardingForm if self.object.onboarding_config: form = ClmDeviceModelOnboardingForm(request.POST, instance=self.object) if form.is_valid(): form.save(onboarding_protocol=OnboardingProtocol(self.object.onboarding_config.onboarding_protocol)) else: form = ClmDeviceModelNoOnboardingForm(request.POST, instance=self.object) if form.is_valid(): form.save() context = self.get_context_data(object=self.object, **kwargs) return self.render_to_response(context)
[docs] class DeviceCertificateLifecycleManagementSummaryView(AbstractCertificateLifecycleManagementSummaryView): """Certificate Lifecycle Management Summary View for devices."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsCertificateLifecycleManagementSummaryView(AbstractCertificateLifecycleManagementSummaryView): """Certificate Lifecycle Management Summary View for OPC UA Devcies."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
# ------------------------------ Certificate Lifecycle Management - Credential Issuance -------------------------------
[docs] class AbstractNoOnboardingIssueNewApplicationCredentialView(PageContextMixin, DetailView[DeviceModel]): """abc."""
[docs] http_method_names = ('get',)
[docs] model = DeviceModel
[docs] context_object_name = 'device'
[docs] template_name = 'devices/credentials/issue_credential.html'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Add the sections to the context. Args: **kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data for the view. """ context = super().get_context_data(**kwargs) context['clm_url'] = f'{self.page_category}:{self.page_name}_certificate_lifecycle_management' context['heading'] = 'Issue New Application Credential' sections = [] if not self.object.no_onboarding_config: err_msg = gettext_lazy('Device is configured for onboarding') raise ValueError(err_msg) sections.append( { 'heading': gettext_lazy('CMP with OpenSSL (shared-secret)'), 'description': gettext_lazy( 'This option will guide you through all steps and commands that are ' 'required to issue a new application certificate ' 'using CMP with OpenSSL using a shared-secret (HMAC).' ), 'protocol': 'cmp-shared-secret', 'enabled': self.object.no_onboarding_config.has_pki_protocol(NoOnboardingPkiProtocol.CMP_SHARED_SECRET), 'url': f'{self.page_category}:{self.page_name}_no_onboarding_cmp_shared_secret_help', } ) sections.append( { 'heading': gettext_lazy('EST with OpenSSL and curL (username & password)'), 'description': gettext_lazy( 'This option will guide you through all steps and commands that are ' 'required to issue a new application certificate using EST using OpenSSL and curL.' ), 'protocol': 'est-username-password', 'enabled': self.object.no_onboarding_config.has_pki_protocol( NoOnboardingPkiProtocol.EST_USERNAME_PASSWORD ), 'url': f'{self.page_category}:{self.page_name}_no_onboarding_est_username_password_help', } ) sections.append( { 'heading': gettext_lazy('Manual Issuance'), 'description': gettext_lazy( 'This option will allow you to issue a new domain credential on the Trustpoint. ' 'The domain credential can then be downloaded for manual injection into the device, ' 'e.g., using a USB-stick.' ), 'protocol': 'manual', 'enabled': self.object.no_onboarding_config.has_pki_protocol(NoOnboardingPkiProtocol.MANUAL), 'url': f'{self.page_category}:{self.page_name}_no_onboarding_select_certificate_profile', } ) context['sections'] = sections return context
[docs] def _get_redirect_url(self) -> HttpResponseRedirect: return redirect(f'{self.page_category}:{self.page_name}_certificate_lifecycle_management', pk=self.object.pk)
[docs] def get(self, request: HttpRequest, *_args: Any, **_kwargs: Any) -> HttpResponse: """Adds checks if the device is configured for no-onboarding and has a domain set. Args: request: The django request object. *_args: Positional arguments are discarded. **_kwargs: Keyword arguments are discarded. Returns: The HttpResponse Or HttpRedirect to the CLM page. """ self.object = self.get_object() if not self.object.no_onboarding_config: err_msg = gettext_lazy('This device is configured for onboarding.') messages.warning(request, err_msg) return self._get_redirect_url() if not self.object.no_onboarding_config.get_pki_protocols(): err_msg = gettext_lazy( 'All PKI protocols for this device to request application certifciates are disabled.' ) messages.warning(request, err_msg) return self._get_redirect_url() if not self.object.domain: err_msg = gettext_lazy('No domain is configured for this device.') messages.warning(request, err_msg) return self._get_redirect_url() context = self.get_context_data(object=self.object) return self.render_to_response(context)
[docs] class DeviceNoOnboardingIssueNewApplicationCredentialView(AbstractNoOnboardingIssueNewApplicationCredentialView): """abc."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsNoOnboardingIssueNewApplicationCredentialView(AbstractNoOnboardingIssueNewApplicationCredentialView): """abc."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractSelectCertificateProfileNewApplicationCredentialView(PageContextMixin, DetailView[DeviceModel]): """abc."""
[docs] http_method_names = ('get',)
[docs] model = DeviceModel
[docs] context_object_name = 'device'
[docs] template_name = 'devices/credentials/profile_select.html'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Add the sections to the context. Args: **kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data for the view. """ context = super().get_context_data(**kwargs) context['certificate_profiles'] = ALLOWED_APP_CRED_PROFILES for profile in context['certificate_profiles']: if profile['profile'] == 'tls-client': profile['url'] = ( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management_issue_tls_client_credential' ) elif profile['profile'] == 'tls-server': profile['url'] = ( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management_issue_tls_server_credential' ) elif profile['profile'] == 'opc-ua-client': profile['url'] = ( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management_issue_opc_ua_client_credential' ) elif profile['profile'] == 'opc-ua-server': profile['url'] = ( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management_issue_opc_ua_server_credential' ) return context
[docs] class DeviceSelectCertificateProfileNewApplicationCredentialView( AbstractSelectCertificateProfileNewApplicationCredentialView ): """abc."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsSelectCertificateProfileNewApplicationCredentialView( AbstractSelectCertificateProfileNewApplicationCredentialView ): """abc."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractOnboardingIssueNewApplicationCredentialView(PageContextMixin, DetailView[DeviceModel]): """abc."""
[docs] http_method_names = ('get',)
[docs] model = DeviceModel
[docs] context_object_name = 'device'
[docs] template_name = 'devices/credentials/issue_credential.html'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Add the sections to the context. Args: **kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data for the view. """ context = super().get_context_data(**kwargs) context['clm_url'] = f'{self.page_category}:{self.page_name}_certificate_lifecycle_management' context['heading'] = 'Issue New Application Credential' sections = [] if not self.object.onboarding_config: err_msg = gettext_lazy('Device is not configured for onboarding') raise ValueError(err_msg) sections.append( { 'heading': gettext_lazy('CMP with Domain Credential'), 'description': gettext_lazy( 'This option will guide you through all steps and commands that are ' 'required to issue a new application certificate using CMP ' 'with OpenSSL using a shared-secret (HMAC).' ), 'protocol': 'cmp', 'enabled': self.object.onboarding_config.has_pki_protocol(OnboardingPkiProtocol.CMP), 'url': ( f'{self.page_category}:{self.page_name}_onboarding_clm_issue_application_credential_cmp_domain_credential' ), } ) sections.append( { 'heading': gettext_lazy('EST with Domain Credential'), 'description': gettext_lazy( 'This option will guide you through all steps and commands that are ' 'required to issue a new application certificate using EST using OpenSSL and curL.' ), 'protocol': 'est', 'enabled': self.object.onboarding_config.has_pki_protocol(OnboardingPkiProtocol.EST), 'url': ( f'{self.page_category}:{self.page_name}_onboarding_clm_issue_application_credential_est_domain_credential' ), } ) context['sections'] = sections return context
[docs] class DeviceOnboardingIssueNewApplicationCredentialView(AbstractOnboardingIssueNewApplicationCredentialView): """abc."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsOnboardingIssueNewApplicationCredentialView(AbstractOnboardingIssueNewApplicationCredentialView): """abc."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractIssueCredentialView[FormClass: BaseCredentialForm, IssuerClass: BaseTlsCredentialIssuer]( PageContextMixin, DetailView[DeviceModel] ): """Base view for all credential issuance views."""
[docs] http_method_names = ('get', 'post')
[docs] model = DeviceModel
[docs] context_object_name = 'device'
[docs] template_name = 'devices/credentials/issue_application_credential.html'
[docs] form_class: type[FormClass]
[docs] issuer_class: type[IssuerClass]
[docs] friendly_name: str
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Add the form to the context. Args: **kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data for the view. """ context = super().get_context_data(**kwargs) if 'form' not in kwargs: context['form'] = self.form_class(**self.get_form_kwargs()) context['clm_url'] = f'devices:{self.page_name}_certificate_lifecycle_management' return context
[docs] def get_form_kwargs(self) -> dict[str, Any]: """This method ads the concerning device model to the form kwargs and returns them. Returns: The form kwargs including the concerning device model. """ if self.object.domain is None: raise Http404(DeviceWithoutDomainErrorMsg) form_kwargs = { 'initial': self.issuer_class.get_fixed_values(device=self.object, domain=self.object.domain), 'prefix': None, 'device': self.object, } if self.request.method == 'POST': form_kwargs.update({'data': self.request.POST}) return form_kwargs
@abc.abstractmethod
[docs] def issue_credential(self, device: DeviceModel, cleaned_data: dict[str, Any]) -> IssuedCredentialModel: """Abstract method to issue a credential. Args: device: The device to be associated with the new credential. cleaned_data: The validated form data. Returns: The IssuedCredentialModel object that was created and saved. """
[docs] def post(self, request: HttpRequest, *_args: Any, **_kwargs: Any) -> HttpResponse: """Adds the object model to the instance and forwards to super().post(). Args: request: The Django request object is only used implicitly through self. *_args: Positional arguments are discarded. **_kwargs: Keyword arguments are discarded. Returns: The HttpResponseBase object returned by super().post(). """ self.object = self.get_object() form = self.form_class(**self.get_form_kwargs()) if form.is_valid(): credential = self.issue_credential(device=self.object, cleaned_data=form.cleaned_data) messages.success( request, f'Successfully issued {self.friendly_name} for device {credential.device.common_name}' ) return HttpResponseRedirect( reverse_lazy( f'devices:{self.page_name}_certificate_lifecycle_management', kwargs={'pk': self.get_object().id} ) ) return self.render_to_response(self.get_context_data(form=form))
[docs] class AbstractIssueDomainCredentialView( AbstractIssueCredentialView[IssueDomainCredentialForm, LocalDomainCredentialIssuer] ): """Base view for issuing domain credentials."""
[docs] form_class = IssueDomainCredentialForm
[docs] template_name = 'devices/credentials/issue_domain_credential.html'
[docs] issuer_class = LocalDomainCredentialIssuer
[docs] friendly_name = 'Domain Credential'
[docs] def issue_credential(self, device: DeviceModel, _cleaned_data: dict[str, Any]) -> IssuedCredentialModel: """Issue a domain credential for the device. Args: device: The device to issue the credential for. _cleaned_data: The validated form data is discarded. Returns: The issued credential model. """ if device.domain is None: err_msg = gettext_lazy('Device has no domain configured.') raise Http404(err_msg) issuer = self.issuer_class(device=device, domain=device.domain) return issuer.issue_domain_credential()
[docs] class DeviceIssueDomainCredentialView(AbstractIssueDomainCredentialView): """View for issuing domain credentials for devices."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsIssueDomainCredentialView(AbstractIssueDomainCredentialView): """View for issuing domain credentials for OPC-UA GDS devices."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractIssueTlsClientCredentialView( AbstractIssueCredentialView[IssueTlsClientCredentialForm, LocalTlsClientCredentialIssuer] ): """View to issue a new TLS client credential."""
[docs] form_class = IssueTlsClientCredentialForm
[docs] issuer_class = LocalTlsClientCredentialIssuer
[docs] friendly_name = 'TLS client credential'
[docs] page_name: str
[docs] def issue_credential(self, device: DeviceModel, cleaned_data: dict[str, Any]) -> IssuedCredentialModel: """Issues an TLS client credential. Args: device: The device to be associated with the new credential. cleaned_data: The validated form data. Returns: The IssuedCredentialModel object that was created and saved. """ common_name = cast('str', cleaned_data.get('common_name')) validity = cast('int', cleaned_data.get('validity')) if not device.domain: raise Http404(DeviceWithoutDomainErrorMsg) issuer = self.issuer_class(device=device, domain=device.domain) return issuer.issue_tls_client_credential(common_name=common_name, validity_days=validity)
[docs] class DeviceIssueTlsClientCredentialView(AbstractIssueTlsClientCredentialView): """Issue a new TLS client credential within the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsIssueTlsClientCredentialView(AbstractIssueTlsClientCredentialView): """Issue a new TLS client credential within the devices section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractIssueTlsServerCredentialView( AbstractIssueCredentialView[IssueTlsServerCredentialForm, LocalTlsServerCredentialIssuer] ): """View to issue a new TLS server credential."""
[docs] form_class = IssueTlsServerCredentialForm
[docs] issuer_class = LocalTlsServerCredentialIssuer
[docs] friendly_name = 'TLS server credential'
[docs] def issue_credential(self, device: DeviceModel, cleaned_data: dict[str, Any]) -> IssuedCredentialModel: """Issues an TLS server credential. Args: device: The device to be associated with the new credential. cleaned_data: The validated form data. Returns: The IssuedCredentialModel object that was created and saved. """ common_name = cast('str', cleaned_data.get('common_name')) if not common_name: err_msg = 'Common name is missing. Cannot issue credential.' raise Http404(err_msg) if not device.domain: raise Http404(DeviceWithoutDomainErrorMsg) issuer = self.issuer_class(device=device, domain=device.domain) return issuer.issue_tls_server_credential( common_name=common_name, ipv4_addresses=cast('list[ipaddress.IPv4Address]', cleaned_data.get('ipv4_addresses')), ipv6_addresses=cast('list[ipaddress.IPv6Address]', cleaned_data.get('ipv6_addresses')), domain_names=cast('list[str]', cleaned_data.get('domain_names')), san_critical=False, validity_days=cast('int', cleaned_data.get('validity')), )
[docs] class DeviceIssueTlsServerCredentialView(AbstractIssueTlsServerCredentialView): """Issues a TLS server credenital within the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsIssueTlsServerCredentialView(AbstractIssueTlsServerCredentialView): """Issues a TLS server credenital within the devices section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractIssueOpcUaClientCredentialView( AbstractIssueCredentialView[IssueOpcUaClientCredentialForm, OpcUaClientCredentialIssuer] ): """View to issue a new OPC UA client credential."""
[docs] form_class = IssueOpcUaClientCredentialForm
[docs] issuer_class = OpcUaClientCredentialIssuer
[docs] friendly_name = 'OPC UA client credential'
[docs] def issue_credential(self, device: DeviceModel, cleaned_data: dict[str, Any]) -> IssuedCredentialModel: """Issues an OPC UA client credential. Args: device: The device to be associated with the new credential. cleaned_data: The validated form data. Returns: The IssuedCredentialModel object that was created and saved. """ if not device.domain: raise Http404(DeviceWithoutDomainErrorMsg) issuer = self.issuer_class(device=device, domain=device.domain) return issuer.issue_opc_ua_client_credential( common_name=cast('str', cleaned_data.get('common_name')), application_uri=cast('str', cleaned_data.get('application_uri')), validity_days=cast('int', cleaned_data.get('validity')), )
[docs] class DeviceIssueOpcUaClientCredentialView(AbstractIssueOpcUaClientCredentialView): """Issues an OPC UA client credential within the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsIssueOpcUaClientCredentialView(AbstractIssueOpcUaClientCredentialView): """Issues an OPC UA client credential within the devices section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractIssueOpcUaServerCredentialView( AbstractIssueCredentialView[IssueOpcUaServerCredentialForm, OpcUaServerCredentialIssuer] ): """View to issue a new OPC UA server credential."""
[docs] form_class = IssueOpcUaServerCredentialForm
[docs] issuer_class = OpcUaServerCredentialIssuer
[docs] friendly_name = 'OPC UA server credential'
[docs] def issue_credential(self, device: DeviceModel, cleaned_data: dict[str, Any]) -> IssuedCredentialModel: """Issues an OPC UA server credential. Args: device: The device to be associated with the new credential. cleaned_data: The validated form data. Returns: The IssuedCredentialModel object that was created and saved. """ common_name = cast('str', cleaned_data.get('common_name')) if not common_name: err_msg = 'Common name is missing. Cannot issue credential.' raise Http404(err_msg) if not device.domain: raise Http404(DeviceWithoutDomainErrorMsg) issuer = self.issuer_class(device=device, domain=device.domain) ipv4_addresses: list[ipaddress.IPv4Address] = cleaned_data.get('ipv4_addresses', []) ipv6_addresses: list[ipaddress.IPv6Address] = cleaned_data.get('ipv6_addresses', []) domain_names: list[str] = cleaned_data.get('domain_names', []) validity_days: int = cleaned_data.get('validity', 0) return issuer.issue_opc_ua_server_credential( common_name=common_name, application_uri=cast('str', cleaned_data.get('application_uri')), ipv4_addresses=ipv4_addresses, ipv6_addresses=ipv6_addresses, domain_names=domain_names, validity_days=validity_days, )
[docs] class DeviceIssueOpcUaServerCredentialView(AbstractIssueOpcUaServerCredentialView): """Issues an OPC UA server credential within the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsIssueOpcUaServerCredentialView(AbstractIssueOpcUaServerCredentialView): """Issues an OPC UA server credential within the devices section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
# -------------------------------- Certificate Lifecycle Management - Token Auth Mixin --------------------------------
[docs] class DownloadTokenRequiredAuthenticationMixin: """Mixin which checks the token included in the URL for browser download views."""
[docs] credential_download: RemoteDeviceCredentialDownloadModel
[docs] def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponseBase: """Checks the validity of the token included in the URL for browser download views and redirects if invalid. Args: request: The django request object. *args: Positional arguments passed to super().dispatch(). **kwargs: Keyword arguments passed to super().dispatch(). Returns: A Django HttpResponseBase object. """ super_dispatch = getattr(super(), 'dispatch', None) if not callable(super_dispatch): err_msg = 'Internal server error. Failed to get super().dispatch().' raise Http404(err_msg) token = request.GET.get('token') try: self.credential_download = RemoteDeviceCredentialDownloadModel.objects.get( issued_credential_model=kwargs.get('pk') ) except RemoteDeviceCredentialDownloadModel.DoesNotExist: messages.warning(request, 'Invalid download token.') return redirect('devices:browser_login') if not token or not self.credential_download.check_token(token): messages.warning(request, 'Invalid download token.') return redirect('devices:browser_login') return cast('HttpResponseBase', super_dispatch(request, *args, **kwargs))
# ----------------------------------- Certificate Lifecycle Management - Downloads ------------------------------------
[docs] class AbstractDownloadPageDispatcherView(PageContextMixin, RedirectView): """Redirects depending on the type of credential, that is if a private key is available or not."""
[docs] http_method_names = ('get',)
[docs] model: type[IssuedCredentialModel] = IssuedCredentialModel
[docs] permanent = False
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_redirect_url(self, *_args: Any, **kwargs: Any) -> str: """Gets the redirection URL depending on the type credential, that is if a private key is available or not. Args: *_args: Positional arguments are discarded. **kwargs: The pk parameter is retrieved and expected to be there. Returns: The redirect URL. """ pk = kwargs.get('pk') # This can only happen if the path for the URL defined in urls.py does not contain <int:pk>. # This would mean we, the dev team, introduced a bug. if pk is None or not isinstance(pk, int): err_msg = 'An unexpected error occurred. Please see logs for more information.' raise Http404(err_msg) issued_credential = IssuedCredentialModel.objects.filter(pk=pk).first() if issued_credential is None: messages.error( self.request, 'No credential found for the given primary key. See logs for more information.' ) return reverse(f'devices:{self.page_name}_certificate_lifecycle_management', kwargs={'pk': pk}) if issued_credential.credential.private_key: return reverse(f'devices:{self.page_name}_credential-download', kwargs={'pk': pk}) return reverse(f'devices:{self.page_name}_certificate-download', kwargs={'pk': pk})
[docs] class DeviceDownloadPageDispatcherView(AbstractDownloadPageDispatcherView): """Download dispatcher view for the device pages."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsDownloadPageDispatcherView(AbstractDownloadPageDispatcherView): """Download dispatcher view for the OPC UA GDS pages."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
# --------------------------------------------- Certificate Download Help ----------------------------------------------
[docs] class AbstractCertificateDownloadView(PageContextMixin, DetailView[IssuedCredentialModel]): """View for downloading certificates."""
[docs] http_method_names = ('get',)
[docs] model: type[IssuedCredentialModel] = IssuedCredentialModel
[docs] template_name = 'devices/credentials/certificate_download.html'
[docs] context_object_name = 'issued_credential'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Add the clm_url to the context. Args: **kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data for the view. """ context = super().get_context_data(**kwargs) context['clm_url'] = f'{self.page_category}:{self.page_name}_certificate_lifecycle_management' return context
[docs] class DeviceCertificateDownloadView(AbstractCertificateDownloadView): """Certificate download view for the device pages."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsCertificateDownloadView(AbstractCertificateDownloadView): """Certificate download view for the OPC UA GDS pages."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
# ---------------------------------------------- Credential Download Help ----------------------------------------------
[docs] class AbstractDeviceBaseCredentialDownloadView(PageContextMixin, DetailView[IssuedCredentialModel]): """View to download a password protected application credential in the desired format. Inherited by the domain and application credential download views. It is not intended for direct use. """
[docs] http_method_names = ('get', 'post')
[docs] model = IssuedCredentialModel
[docs] template_name = 'devices/credentials/credential_download.html'
[docs] context_object_name = 'credential'
[docs] form_class = CredentialDownloadForm
[docs] is_browser_download = False
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds information about the credential to the context. Args: **kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data for the view. """ context = super().get_context_data(**kwargs) issued_credential = self.object credential = issued_credential.credential if credential.credential_type != CredentialModel.CredentialTypeChoice.ISSUED_CREDENTIAL: # sanity check err_msg = 'Credential is not an issued credential.' raise Http404(err_msg) credential_purpose = IssuedCredentialModel.IssuedCredentialPurpose( issued_credential.issued_credential_purpose ).label domain_credential_value = IssuedCredentialModel.IssuedCredentialType.DOMAIN_CREDENTIAL.value application_credential_value = IssuedCredentialModel.IssuedCredentialType.APPLICATION_CREDENTIAL.value if issued_credential.issued_credential_type == domain_credential_value: context['credential_type'] = credential_purpose elif issued_credential.issued_credential_type == application_credential_value: context['credential_type'] = credential_purpose + ' Credential' else: err_msg = 'Unknown IssuedCredentialType' raise Http404(err_msg) context['FileFormat'] = CredentialFileFormat.__members__ context['is_browser_dl'] = self.is_browser_download context['show_browser_dl'] = not self.is_browser_download context['issued_credential'] = issued_credential if 'form' not in kwargs: context['form'] = self.form_class() else: context['form'] = kwargs['form'] context['browser_otp_url'] = f'devices:{self.page_name}_browser_otp_view' context['clm_url'] = f'devices:{self.page_name}_certificate_lifecycle_management' return context
[docs] def post(self, _request: HttpRequest, *_args: Any, **_kwargs: Any) -> HttpResponse: """Processing the valid form data. This will use the contained form data to start the download process of the desired file. Args: _request: The django request object. *_args: Positional arguments are discarded. **_kwargs: Keyword arguments are discarded. Returns: If successful, this will start the file download. Otherwise, a Http404 will be raised and displayed. """ self.object = self.get_object() form = self.form_class(self.request.POST) if form.is_valid(): password = form.cleaned_data['password'].encode() try: file_format = CredentialFileFormat(self.request.POST.get('file_format')) except ValueError as exception: err_msg = gettext_lazy('Unknown file format.') raise Http404(err_msg) from exception credential_model = self.object.credential credential_serializer = credential_model.get_credential_serializer() private_key_serializer = credential_serializer.get_private_key_serializer() certificate_serializer = credential_serializer.get_certificate_serializer() cert_collection_serializer = credential_serializer.get_additional_certificates_serializer() if not private_key_serializer or not certificate_serializer or not cert_collection_serializer: raise Http404 credential_purpose = IssuedCredentialModel.IssuedCredentialPurpose( self.object.issued_credential_purpose ).label credential_type_name = credential_purpose.replace(' ', '-').lower().replace('-credential', '') if file_format == CredentialFileFormat.PKCS12: file_stream_data = io.BytesIO(credential_serializer.as_pkcs12(password=password)) elif file_format == CredentialFileFormat.PEM_ZIP: file_data = Archiver.archive_zip( data_to_archive={ 'private_key.pem': private_key_serializer.as_pkcs8_pem(password=password), 'certificate.pem': certificate_serializer.as_pem(), 'certificate_chain.pem': cert_collection_serializer.as_pem(), } ) file_stream_data = io.BytesIO(file_data) elif file_format == CredentialFileFormat.PEM_TAR_GZ: file_data = Archiver.archive_tar_gz( data_to_archive={ 'private_key.pem': private_key_serializer.as_pkcs8_pem(password=password), 'certificate.pem': certificate_serializer.as_pem(), 'certificate_chain.pem': cert_collection_serializer.as_pem(), } ) file_stream_data = io.BytesIO(file_data) else: err_msg = gettext_lazy('Unknown file format.') raise Http404(err_msg) response = FileResponse( file_stream_data, content_type=file_format.mime_type, as_attachment=True, filename=f'trustpoint-{credential_type_name}-credential{file_format.file_extension}', ) return cast('HttpResponse', response) return self.render_to_response(self.get_context_data(form=form))
[docs] class DeviceManualCredentialDownloadView(AbstractDeviceBaseCredentialDownloadView): """View to download a password protected domain or application credential in the desired format."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
@method_decorator(login_not_required, name='dispatch')
[docs] class DeviceBrowserCredentialDownloadView( DownloadTokenRequiredAuthenticationMixin, AbstractDeviceBaseCredentialDownloadView ): """View to download a password protected domain or app credential in the desired format from a remote client."""
[docs] is_browser_download = True
[docs] class AbstractBrowserOnboardingOTPView(PageContextMixin, DetailView[IssuedCredentialModel]): """View to display the OTP for remote credential download (aka. browser onboarding)."""
[docs] http_method_names = ('get',)
[docs] model = IssuedCredentialModel
[docs] template_name = 'devices/credentials/onboarding/browser/otp_view.html'
[docs] redirection_view = 'devices:devices'
[docs] context_object_name = 'credential'
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds information about the credential and otp for the browser download process. Args: **kwargs: Keyword arguments passed to super().get_context_data. Returns: The context to render the page. """ credential = self.get_object() device = credential.device context = super().get_context_data(**kwargs) try: cdm = RemoteDeviceCredentialDownloadModel.objects.get(issued_credential_model=credential, device=device) cdm.delete() except RemoteDeviceCredentialDownloadModel.DoesNotExist: pass cdm = RemoteDeviceCredentialDownloadModel(issued_credential_model=credential, device=device) cdm.save() context.update( { 'device_name': device.common_name, 'device_id': device.id, 'credential_id': credential.id, 'otp': cdm.get_otp_display(), 'download_url': self.request.build_absolute_uri(reverse('devices:browser_login')), } ) context['cred_download_url'] = f'devices:{self.page_name}_credential-download' context['browser_cancel'] = f'devices:{self.page_name}_browser_cancel' return context
[docs] class DeviceBrowserOnboardingOTPView(AbstractBrowserOnboardingOTPView): """The browser onboarding OTP view for the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsBrowserOnboardingOTPView(AbstractBrowserOnboardingOTPView): """The browser onboarding OTP view for the OPC UA GDS section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractBrowserOnboardingCancelView(PageContextMixin, DetailView[IssuedCredentialModel]): """View to cancel the browser onboarding process and delete the associated RemoteDeviceCredentialDownloadModel."""
[docs] http_method_names = ('get',)
[docs] model = IssuedCredentialModel
[docs] context_object_name = 'credential'
[docs] permanent = False
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get(self, request: HttpRequest, *_args: Any, **_kwargs: Any) -> HttpResponse: """Cancels the browser onboarding process and deletes the associated RemoteDeviceCredentialDownloadModel. Args: request: The Django request object. *_args: Positional arguments are discarded. **_kwargs: Keyword arguments are discarded. Returns: The HttpResponseBase object with the desired redirection URL. """ self.object = self.get_object() try: cdm = RemoteDeviceCredentialDownloadModel.objects.get( issued_credential_model=self.object, device=self.object.device ) cdm.delete() messages.info(request, 'The browser onboarding process was canceled.') except RemoteDeviceCredentialDownloadModel.DoesNotExist: pass return HttpResponseRedirect( reverse_lazy(f'devices:{self.page_name}_credential-download', kwargs={'pk': self.object.id}) )
[docs] class DeviceBrowserOnboardingCancelView(AbstractBrowserOnboardingCancelView): """Cancels the browser onboarding for the devices section."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsBrowserOnboardingCancelView(AbstractBrowserOnboardingCancelView): """Cancels the browser onboarding for the OPC UA GDS section."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
@method_decorator(login_not_required, name='dispatch')
[docs] class DeviceOnboardingBrowserLoginView(FormView[BrowserLoginForm]): """View to handle certificate download requests."""
[docs] http_method_names = ('get', 'post')
[docs] template_name = 'devices/credentials/onboarding/browser/login.html'
[docs] form_class = BrowserLoginForm
[docs] cleaned_data: dict[str, Any]
[docs] def get_success_url(self) -> str: """Gets the success url to redirect to after successful processing of the POST data following a form submit. Returns: The success url to redirect to after successful processing of the POST data following a form submit. """ credential_id = cast('int', self.cleaned_data.get('credential_id')) credential_download = cast('RemoteDeviceCredentialDownloadModel', self.cleaned_data.get('credential_download')) token: str = credential_download.download_token return ( f'{reverse_lazy("devices:browser_domain_credential_download", kwargs={"pk": credential_id})}?token={token}' )
[docs] def form_invalid(self, form: BrowserLoginForm) -> HttpResponse: """Adds an error message in the case of an invalid OTP. Args: form: The corresponding form object. Returns: The Django HttpResponse object. """ messages.error(self.request, gettext_lazy('The provided password is not valid.')) return super().form_invalid(form)
[docs] def form_valid(self, form: BrowserLoginForm) -> HttpResponse: """Performed if the form was validated successfully and adds the cleaned data to the instance. Args: form: The corresponding form object. Returns: The Django HttpResponse object. """ self.cleaned_data = form.cleaned_data return super().form_valid(form)
# ---------------------------------------- Revocation Views ----------------------------------------
[docs] class AbstractIssuedCredentialRevocationView(PageContextMixin, DetailView[IssuedCredentialModel]): """Revokes a specific issued credential."""
[docs] http_method_names = ('get', 'post')
[docs] model = IssuedCredentialModel
[docs] template_name = 'devices/confirm_revoke.html'
[docs] context_object_name = 'issued_credential'
[docs] pk_url_kwarg = 'pk'
[docs] form_class = RevokeIssuedCredentialForm
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the primary keys to the context. Args: kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data. """ context = super().get_context_data(**kwargs) context['revoke_form'] = self.form_class() context['cert'] = self.object.credential.certificate context['cred_revoke_url'] = f'{self.page_category}:{self.page_name}_credential_revoke' return context
[docs] def post(self, _request: HttpRequest, *_args: Any, **_kwargs: Any) -> HttpResponse: """Will try to revoke the requested issued credential. Args: request: The Django request object. *_args: Positional arguments are discarded. **_kwargs: Keyword arguments are discarded. Returns: Redirect to the devices summary. """ self.object = self.get_object() reverse_path = reverse( f'{self.page_category}:{self.page_name}_certificate_lifecycle_management', kwargs={'pk': self.object.device.pk}, ) revoke_form = self.form_class(self.request.POST) if revoke_form.is_valid(): revocation_reason = revoke_form.cleaned_data['revocation_reason'] status = self.object.credential.certificate.certificate_status if status == CertificateModel.CertificateStatus.EXPIRED: msg = gettext_lazy('Credential is already expired. Cannot revoke expired certificates.') messages.error(self.request, msg) return redirect(reverse_path) if status == CertificateModel.CertificateStatus.REVOKED: msg = gettext_lazy('Certificate is already revoked. Cannot revoke a revoked certificate again.') return redirect(reverse_path) revoked_successfully, _ = DeviceCredentialRevocation.revoke_certificate(self.object.id, revocation_reason) if revoked_successfully: msg = gettext_lazy('Successfully revoked one active credential.') messages.success(self.request, msg) else: messages.error( self.request, gettext_lazy('Failed to revoke certificate. See logs for more information.') ) return redirect(reverse_path)
[docs] class DeviceIssuedCredentialRevocationView(AbstractIssuedCredentialRevocationView): """abc."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsIssuedCredentialRevocationView(AbstractIssuedCredentialRevocationView): """abc."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY
[docs] class AbstractBulkRevokeView(LoggerMixin, PageContextMixin, ListView[DeviceModel]): """View to confirm the deletion of multiple Devices."""
[docs] model = DeviceModel
[docs] template_name = 'devices/confirm_bulk_revoke.html'
[docs] context_object_name = 'devices'
[docs] missing: str = ''
[docs] pks: str = ''
[docs] queryset: QuerySet[DeviceModel]
[docs] form_class = RevokeDevicesForm
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the primary keys to the context. Args: kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data. """ context = super().get_context_data(**kwargs) context['pks'] = self.pks context['revoke_form'] = self.form_class(initial={'pks': self.pks}) context['device_revoke_url'] = f'{self.page_category}:{self.page_name}_device_revoke' return context
[docs] def get_queryset(self) -> QuerySet[DeviceModel]: """Gets the queryset of DeviceModel objects that are requested to be revoked. Returns: The queryset of DeviceModel objects that are requested to be revoked. """ if not self.pks: self.pks = self.kwargs.get('pks', '') pks_list = get_primary_keys_from_str_as_list_of_ints(pks=self.pks) qs = DeviceModel.objects.filter(pk__in=pks_list) found_pks = set(qs.values_list('pk', flat=True)) missing = set(pks_list) - found_pks self.missing = ', '.join(str(pk) for pk in missing) return qs
[docs] def _set_queryset(self, request: HttpRequest) -> str | None: try: self.queryset = self.get_queryset() except ValueError: err_msg_template = gettext_lazy('Please select the devices you would like to revoke.') err_msg = err_msg_template.format(pks=self.pks) messages.error(request, err_msg) return 'devices:devices' except Exception: err_msg_template = gettext_lazy( f'Failed to retrieve the queryset for primary keys: {self.pks}.See logs for more details.' ) err_msg = err_msg_template.format(pks=self.pks) self.logger.exception(err_msg) messages.error(request, err_msg) return 'devices:devices' if self.missing: err_msg_template = gettext_lazy(f'Devices for the following primary keys were not found: {self.pks}.') err_msg = err_msg_template.format(pks=self.missing) messages.error(request, err_msg) return 'devices:devices' return None
[docs] def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: """HTTP GET processing. Args: request: The Django request object. *args: Positional arguments passed to super().get(). **kwargs: Keyword arguments passed to super().get(). Returns: The device deletion view or a redirect to the devices view if one or more pks were not found. """ redirect_name = self._set_queryset(request) if redirect_name: return redirect(redirect_name) messages.warning( request, gettext_lazy('This operation will revoke ALL certificates associated with the selected devices.') ) return super().get(request, *args, **kwargs)
[docs] def post(self, request: HttpRequest, *_args: Any, **_kwargs: Any) -> HttpResponse: """Will try to revoke all certificate assiciated with the requested DeviceModel records. Args: request: The Django request object. *_args: Positional arguments are discarded. **_kwargs: Keyword arguments are discarded. Returns: Redirect to the devices summary. """ revoke_form = self.form_class(self.request.POST) if revoke_form.is_valid(): self.pks = revoke_form.cleaned_data['pks'] revocation_reason = revoke_form.cleaned_data['revocation_reason'] redirect_name = self._set_queryset(request) if redirect_name: return redirect(redirect_name) now = datetime.datetime.now(datetime.UTC) issued_credentials_to_revoke_qs = IssuedCredentialModel.objects.filter( device__in=self.queryset, credential__certificate__revoked_certificate__isnull=True, credential__certificate__not_valid_after__gte=now, ) n_revoked = 0 for credential in issued_credentials_to_revoke_qs: revoked_successfully, _ = DeviceCredentialRevocation.revoke_certificate( credential.id, revocation_reason ) if revoked_successfully: n_revoked += 1 if n_revoked > 0: msg = ngettext( 'Successfully revoked one active credential.', 'Successfully revoked %(count)d active credentials.', n_revoked, ) % {'count': n_revoked} messages.success(self.request, msg) else: messages.error(self.request, gettext_lazy('No credentials were revoked.')) return redirect('devices:devices')
[docs] class DeviceBulkRevokeView(AbstractBulkRevokeView): """abc."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsBulkRevokeView(AbstractBulkRevokeView): """abc."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class AbstractBulkDeleteView(LoggerMixin, PageContextMixin, ListView[DeviceModel]): """View to confirm the deletion of multiple Devices."""
[docs] model = DeviceModel
[docs] template_name = 'devices/confirm_delete.html'
[docs] context_object_name = 'devices'
[docs] missing: str = ''
[docs] pks: str = ''
[docs] queryset: QuerySet[DeviceModel]
[docs] form_class = DeleteDevicesForm
[docs] page_category = DEVICES_PAGE_CATEGORY
[docs] page_name: str
[docs] def get_context_data(self, **kwargs: Any) -> dict[str, Any]: """Adds the primary keys to the context. Args: kwargs: Keyword arguments are passed to super().get_context_data(**kwargs). Returns: The context data. """ context = super().get_context_data(**kwargs) context['pks'] = self.pks context['delete_form'] = self.form_class(initial={'pks': self.pks}) context['device_delete_url'] = f'{self.page_category}:{self.page_name}_device_delete' return context
[docs] def get_queryset(self) -> QuerySet[DeviceModel]: """Gets the queryset of DeviceModel objects that are requested to be deleted. Returns: The queryset of DeviceModel objects that are requested to be deleted. """ if not self.pks: self.pks = self.kwargs.get('pks', '') pks_list = get_primary_keys_from_str_as_list_of_ints(pks=self.pks) qs = DeviceModel.objects.filter(pk__in=pks_list) found_pks = set(qs.values_list('pk', flat=True)) missing = set(pks_list) - found_pks self.missing = ', '.join(str(pk) for pk in missing) return qs
[docs] def _set_queryset(self, request: HttpRequest) -> str | None: try: self.queryset = self.get_queryset() except ValueError: err_msg_template = gettext_lazy('Please select the devices you would like to delete.') err_msg = err_msg_template.format(pks=self.pks) messages.error(request, err_msg) return 'devices:devices' except Exception: err_msg_template = gettext_lazy( f'Failed to retrieve the queryset for primary keys: {self.pks}.See logs for more details.' ) err_msg = err_msg_template.format(pks=self.pks) self.logger.exception(err_msg) messages.error(request, err_msg) return 'devices:devices' if self.missing: err_msg_template = gettext_lazy(f'Devices for the following primary keys were not found: {self.pks}.') err_msg = err_msg_template.format(pks=self.missing) messages.error(request, err_msg) return 'devices:devices' return None
[docs] def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: """HTTP GET processing. Args: request: The Django request object. *args: Positional arguments passed to super().get(). **kwargs: Keyword arguments passed to super().get(). Returns: The device deletion view or a redirect to the devices view if one or more pks were not found. """ redirect_name = self._set_queryset(request) if redirect_name: return redirect(redirect_name) messages.warning( request, gettext_lazy('This operation will revoke ALL certificates associated with the selected devices.') ) return super().get(request, *args, **kwargs)
[docs] def post(self, request: HttpRequest, *_args: Any, **_kwargs: Any) -> HttpResponse: """HTTP POST processing which will try to delete all requested DeviceModel records. Args: request: The Django request object. *_args: Positional arguments are discarded. **_kwargs: Keyword arguments are discarded. Returns: Redirect to the devices summary. """ delete_form = self.form_class(self.request.POST) if delete_form.is_valid(): self.pks = delete_form.cleaned_data['pks'] redirect_name = self._set_queryset(request) if redirect_name: return redirect(redirect_name) try: count, _ = self.queryset.delete() success_msg_template = gettext_lazy( 'Successfully deleted {count} devices. All corresponding certificates have been revoked.' ) success_msg = success_msg_template.format(count=count) messages.success(request, success_msg) except Exception: err_msg = 'Failed to delete DeviceModel records.' self.logger.exception(err_msg) messages.error( request, gettext_lazy('Failed to delete DeviceModel records. See logs for more information.') ) return redirect('devices:devices')
[docs] class DeviceBulkDeleteView(AbstractBulkDeleteView): """abc."""
[docs] page_name = DEVICES_PAGE_DEVICES_SUBCATEGORY
[docs] class OpcUaGdsBulkDeleteView(AbstractBulkDeleteView): """abc."""
[docs] page_name = DEVICES_PAGE_OPC_UA_SUBCATEGORY