"""Views for EST (Enrollment over Secure Transport) handling authentication and certificate issuance."""
import base64
import ipaddress
import re
import enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, ClassVar, Protocol, cast
from cryptography import x509
from cryptography.hazmat.primitives._serialization import Encoding
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from devices.issuer import (
LocalDomainCredentialIssuer,
LocalTlsClientCredentialIssuer,
LocalTlsServerCredentialIssuer,
OpcUaClientCredentialIssuer,
OpcUaServerCredentialIssuer,
)
from devices.models import (
DeviceModel,
IssuedCredentialModel,
OnboardingConfigModel,
OnboardingPkiProtocol,
OnboardingProtocol,
OnboardingStatus,
)
from django.http import HttpRequest, HttpResponse, HttpResponseBase
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from pki.models.domain import DomainModel
from pki.util.idevid import IDevIDAuthenticationError, IDevIDAuthenticator
from pki.util.x509 import ApacheTLSClientCertExtractor, ClientCertificateAuthenticationError
from pyasn1.type.univ import ObjectIdentifier # type: ignore[import-untyped]
from trustpoint.logger import LoggerMixin
if TYPE_CHECKING:
from pki.models.credential import CredentialModel
[docs]
class UsernamePasswordAuthenticationError(Exception):
"""Exception raised for username and password authentication failures."""
[docs]
THRESHOLD_LOGGER: int = 400
[docs]
class LoggedHttpResponse(HttpResponse, LoggerMixin):
"""Custom HttpResponse that logs and prints error messages automatically."""
def __init__(self, content: str | bytes = b'', status: int | None = None, *args: Any, **kwargs: Any) -> None:
"""Initialize the LoggedHttpResponse instance.
Args:
content (Any): The content of the response.
status (Optional[int], optional): The HTTP status code of the response. Defaults to None.
*args (Any): Additional positional arguments passed to HttpResponse.
**kwargs (Any): Additional keyword arguments passed to HttpResponse.
"""
if status and status >= THRESHOLD_LOGGER:
if isinstance(content, bytes):
content = content.decode('utf-8')
self.logger.error('EST - ERROR - %s - %s', status, content)
else:
self.logger.info('EST - SUCCESS - %s', status)
super().__init__(content, *args, status=status, **kwargs)
[docs]
class Dispatchable(Protocol):
"""Protocol defining a dispatch method for handling HTTP requests."""
[docs]
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Handle the dispatching of an HTTP request."""
...
@dataclass
[docs]
class CredentialRequest:
"""Encapsulates the details extracted from a CSR."""
[docs]
serial_number: str | None
[docs]
ipv4_addresses: list[ipaddress.IPv4Address]
[docs]
ipv6_addresses: list[ipaddress.IPv6Address]
[docs]
public_key: rsa.RSAPublicKey | ec.EllipticCurvePublicKey
[docs]
class EstAuthenticationMixin(LoggerMixin):
"""Checks for HTTP Basic Authentication before processing the request."""
[docs]
used_onboarding_protocol_auth: OnboardingProtocol | None = None
@staticmethod
[docs]
def authenticate_username_password(request: HttpRequest) -> DeviceModel:
"""Authenticate a user using HTTP Basic credentials and return associated DeviceModel.
:param request: Django HttpRequest containing the headers.
:return: Authenticated DeviceModel instance.
:raises UsernamePasswordAuthenticationError: if authentication fails.
"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Basic '):
error_message = 'Invalid auth header'
raise UsernamePasswordAuthenticationError(error_message)
try:
decoded_credentials = base64.b64decode(auth_header.split(' ', 1)[1].strip()).decode('utf-8')
username, password = decoded_credentials.split(':', 1)
except Exception as e:
error_message = 'Malformed authentication credentials'
raise UsernamePasswordAuthenticationError(error_message) from e
device = DeviceModel.objects.filter(common_name=username).first()
if not device:
error_message = 'Invalid authentication credentials'
raise UsernamePasswordAuthenticationError(error_message)
return device
[docs]
def authenticate_domain_credential(self, request: HttpRequest) -> DeviceModel:
"""Authenticate client using a Domain Credential TLS cert (Mutual TLS), return the associated DeviceModel."""
client_cert, _intermediary_cas = ApacheTLSClientCertExtractor.get_client_cert_as_x509(request)
try:
issued_credential = IssuedCredentialModel.get_credential_for_certificate(client_cert)
except IssuedCredentialModel.DoesNotExist as e:
raise ClientCertificateAuthenticationError from e
is_valid, reason = issued_credential.is_valid_domain_credential()
if not is_valid:
error_message = f'Invalid SSL_CLIENT_CERT header: {reason}'
raise ClientCertificateAuthenticationError(error_message)
return issued_credential.device
[docs]
def authenticate_reenrollment_application_credential(
self, request: HttpRequest, csr: x509.CertificateSigningRequest
) -> DeviceModel:
"""Authenticate client using an Application Credential. This is only allowed for reenrolling.
Only authenticates if subject and SAN in both client cert and CSR match the existing issued credential.
"""
client_cert, _intermediary_cas = ApacheTLSClientCertExtractor.get_client_cert_as_x509(request)
try:
issued_credential = IssuedCredentialModel.get_credential_for_certificate(client_cert)
except IssuedCredentialModel.DoesNotExist as e:
raise ClientCertificateAuthenticationError from e
credential_model: CredentialModel = issued_credential.credential
is_valid, reason = credential_model.is_valid_issued_credential()
if not is_valid:
error_message = f'Invalid SSL_CLIENT_CERT header: {reason}'
raise ClientCertificateAuthenticationError(error_message)
# RFC 7030: For reenrollment, client certificate and CSR subject/SAN must match the existing issued credential
if (not credential_model.certificate.subjects_match(csr.subject) or
not credential_model.certificate.subjects_match(client_cert.subject)):
error_message = 'CSR/client subject does not match the credential certificate subject'
raise ClientCertificateAuthenticationError(error_message)
try:
credential_cert = credential_model.certificate.get_certificate_serializer().as_crypto()
credential_cert_san = credential_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.ExtensionNotFound:
credential_cert_san = None
try:
csr_san = csr.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.ExtensionNotFound:
csr_san = None
try:
client_san = client_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.ExtensionNotFound:
client_san = None
if (client_san != csr_san or credential_cert_san != csr_san):
error_message = 'CSR/client SAN does not match the credential certificate subject'
raise ClientCertificateAuthenticationError(error_message)
return issued_credential.device
[docs]
def authenticate_request(
self, request: HttpRequest, domain: DomainModel, cert_template_str: str,
csr: x509.CertificateSigningRequest | None = None
) -> tuple[DeviceModel | None, LoggedHttpResponse | None]:
"""Authenticate the request and return a DeviceModel if authentication succeeds."""
if cert_template_str == 'domaincredential':
device, http_response = self._authenticate_domain_credential_request(request, domain)
else:
device, http_response = self._authenticate_application_certificate_request(request, domain, csr)
if device is None and http_response is None:
return None, LoggedHttpResponse('Authentication failed: No valid authentication method used', status=401)
return device, http_response
[docs]
def _authenticate_domain_credential_request(
self, request: HttpRequest, domain: DomainModel
) -> tuple[DeviceModel | None, LoggedHttpResponse | None]:
"""Authenticate requests for 'domaincredential' certificates and return the associated DeviceModel."""
try:
device = self.authenticate_username_password(request)
self.used_onboarding_protocol_auth = OnboardingProtocol.EST_USERNAME_PASSWORD
except UsernamePasswordAuthenticationError:
pass
else:
return device, None
try:
device_or_none = IDevIDAuthenticator.authenticate_idevid(request, domain)
self.used_onboarding_protocol_auth = OnboardingProtocol.EST_IDEVID
except IDevIDAuthenticationError as e:
return None, LoggedHttpResponse(f'Error validating the IDevID: {e!s}', status=500)
else:
return device_or_none, None
[docs]
def _authenticate_application_certificate_request(
self, request: HttpRequest, domain: DomainModel, csr: x509.CertificateSigningRequest | None
) -> tuple[DeviceModel | None, LoggedHttpResponse | None]:
"""Authenticate requests for application certificate templates and return the associated DeviceModel."""
__ = domain
if csr:
try:
device = self.authenticate_reenrollment_application_credential(request, csr)
except ClientCertificateAuthenticationError:
self.logger.exception('Reenroll application Client certificate authentication failed')
#pass
else:
self.logger.info('Reenroll application Client certificate authentication succeeded')
return device, None
try:
device = self.authenticate_username_password(request)
except UsernamePasswordAuthenticationError:
pass
else:
return device, None
try:
device = self.authenticate_domain_credential(request)
except ClientCertificateAuthenticationError as e:
return None, LoggedHttpResponse(f'Error validating the client certificate: {e!s}', status=500)
else:
return device, None
[docs]
class EstHttpMixin:
"""Mixin for processing HTTP requests for EST endpoints.
This mixin reads the raw message from the request, verifies that the payload:
- Does not exceed the maximum allowed size.
- Contains the expected content type.
- Is optionally decoded from base64 if required.
Upon successful validation, the mixin delegates the request handling to the parent dispatch method.
"""
[docs]
expected_content_type = 'application/pkcs10'
[docs]
max_payload_size = 131072
[docs]
def process_http_request(self, request: HttpRequest) -> tuple[bytes | None, LoggedHttpResponse | None]:
"""Process the incoming HTTP request for EST enrollment.
The method performs the following checks in order:
1. Reads the raw request message and ensures it does not exceed the maximum allowed size.
2. Verifies that the request contains a Content-Type header matching the expected type.
3. If the request includes a 'Content-Transfer-Encoding' header set to 'base64',
decodes the raw message from base64.
4. Delegates the remaining request processing to the parent class's dispatch method.
:param request: The incoming HttpRequest.
:return: An LoggedHttpResponse, either an error response or the result of the parent dispatch.
"""
self.raw_message = request.read()
if len(self.raw_message) > self.max_payload_size:
error_message = 'Message is too large.'
return None, LoggedHttpResponse(content=error_message, status=413)
if request.headers.get('Content-Transfer-Encoding', '').lower() == 'base64':
try:
self.raw_message = base64.b64decode(self.raw_message)
except Exception: # noqa: BLE001
error_message = 'Invalid base64 encoding in message.'
return None, LoggedHttpResponse(content=error_message, status=400)
return self.raw_message, None
[docs]
class EstRequestedDomainExtractorMixin:
"""Mixin to extract the requested domain.
This mixin sets:
- self.requested_domain: The DomainModel instance based on the 'domain' parameter.
- self.issuing_ca_certificate: The CA certificate for the requested domain.
- self.signature_suite: The signature suite derived from the CA certificate.
"""
[docs]
requested_domain: DomainModel | None
[docs]
def extract_requested_domain(self, domain_name: str) -> tuple[DomainModel | None, LoggedHttpResponse | None]:
"""Extracts the requested domain and sets the relevant certificate and signature suite.
:return: The response from the parent class's dispatch method.
"""
try:
requested_domain = DomainModel.objects.get(unique_name=domain_name)
except DomainModel.DoesNotExist:
return None, LoggedHttpResponse('Domain does not exist.', status=404)
else:
return requested_domain, None
[docs]
class EstPkiMessageSerializerMixin(LoggerMixin):
"""Mixin to handle serialization and deserialization of PKCS#10 certificate signing requests."""
[docs]
def deserialize_pki_message(self, data: bytes) -> tuple[
CredentialRequest | None, x509.CertificateSigningRequest | None, LoggedHttpResponse | None]:
"""Deserializes a DER-encoded PKCS#10 certificate signing request.
:param data: DER-encoded PKCS#10 request bytes.
:param requested_cert_template: Certificate template string.
:return: An CredentialRequest object.
:raises ValueError: If deserialization fails.
"""
try:
if b'CERTIFICATE REQUEST-----' in data:
request_format = 'pem'
csr = x509.load_pem_x509_csr(data)
elif re.match(rb'^[A-Za-z0-9+/=\n]+$', data):
request_format = 'base64_der'
der_data = base64.b64decode(data)
csr = x509.load_der_x509_csr(der_data)
elif data.startswith(b'\x30'): # ASN.1 DER should start with 0x30 (SEQUENCE tag)
request_format = 'der'
csr = x509.load_der_x509_csr(data)
else:
error_message = "Unsupported CSR format. Ensure it's PEM, Base64, or raw DER."
return None, None, LoggedHttpResponse(error_message, status=400)
except Exception as exception:
return None, None, LoggedHttpResponse(f'Failed to deserialize PKCS#10 certificate signing request {exception}',
status=500)
try:
self.verify_csr_signature(csr)
except Exception: # noqa: BLE001
return None, None, LoggedHttpResponse('Failed to verify PKCS#10 certificate signing request', status=500)
try:
cert_details = self.extract_details_from_csr(csr, request_format)
except Exception as e: # noqa: BLE001
return None, None, LoggedHttpResponse(f'Failed to extract information from CSR: {e}', status=500)
return cert_details, csr, None
[docs]
def verify_csr_signature(self, csr: x509.CertificateSigningRequest) -> None:
"""Verifies that the CSR's signature is valid by using the public key contained in the CSR.
Supports RSA, ECDSA, and DSA public keys.
"""
public_key = csr.public_key()
signature_hash_algorithm = csr.signature_hash_algorithm
if signature_hash_algorithm is None:
error_message = 'CSR signature hash algorithm is missing.'
raise ValueError(error_message)
if not isinstance(public_key, rsa.RSAPublicKey | ec.EllipticCurvePublicKey):
error_message = 'Unsupported public key type for CSR signature verification.'
raise TypeError(error_message)
try:
if isinstance(public_key, rsa.RSAPublicKey):
public_key.verify(
signature=csr.signature,
data=csr.tbs_certrequest_bytes,
padding=padding.PKCS1v15(),
algorithm=signature_hash_algorithm,
)
elif isinstance(public_key, ec.EllipticCurvePublicKey):
public_key.verify(
signature=csr.signature,
data=csr.tbs_certrequest_bytes,
signature_algorithm=ec.ECDSA(signature_hash_algorithm),
)
except Exception as e:
error_message = 'CSR signature verification failed.'
raise ValueError(error_message) from e
[docs]
class DeviceHandlerMixin:
"""Extract the serial number from an X.509 CSR and retrieve or create a DeviceModel instance.
This mixin assumes the CSR is already deserialized into a cryptography.x509.CertificateSigningRequest object.
"""
[docs]
def create_device_idevid(
self, credential_request: CredentialRequest, domain: DomainModel, cert_template: str
) -> DeviceModel:
"""Retrieves a DeviceModel instance using the serial number extracted from the provided CSR.
If a device with that serial number does not exist, a new one is created.
:param csr: A cryptography.x509.CertificateSigningRequest instance.
:param domain: The DomainModel instance associated with this device.
:param cert_template: The X509 Certificate Template to use for this device.
:return: A DeviceModel instance corresponding to the extracted serial number.
"""
device = DeviceModel(
serial_number=str(credential_request.serial_number),
common_name=credential_request.common_name,
domain=domain
)
if cert_template == 'domaincredential':
onboarding_config = OnboardingConfigModel(
onboarding_protocol=OnboardingProtocol.EST_IDEVID,
onboarding_status=OnboardingStatus.PENDING
)
onboarding_config.add_pki_protocol(OnboardingPkiProtocol.EST)
onboarding_config.full_clean()
onboarding_config.save()
device.onboarding_config = onboarding_config
device.full_clean()
device.save()
return device
[docs]
class CredentialIssuanceMixin:
"""Mixin to handle issuing credentials based on a given certificate template input.
Required inputs for the `issue_credential` method:
- cert_template_str: A string indicating the certificate template type.
Supported values: 'tls-server', 'tls-client', or 'domaincredential'.
- cert_template_class: The class responsible for issuing the credential.
- device: The device instance for which the credential is issued.
- domain: The domain instance used during credential issuance.
- csr: The certificate signing request (used only for 'domaincredential').
Additional parameters are used by the specific issuance methods:
- common_name: Used for 'tls-client' and 'tls-server' credentials.
- validity_days: Used for 'tls-client' and 'tls-server' credentials.
- ipv4_addresses, ipv6_addresses, domain_names: Used for 'tls-server' credentials.
"""
[docs]
cert_template_classes: ClassVar[dict[str, type]] = {
'tls-server': LocalTlsServerCredentialIssuer,
'tls-client': LocalTlsClientCredentialIssuer,
'opc-ua-server': OpcUaServerCredentialIssuer,
'opc-ua-client': OpcUaClientCredentialIssuer,
'domaincredential': LocalDomainCredentialIssuer,
}
[docs]
def _validate_subject_attributes(
self, subject_attributes: list[x509.NameAttribute[Any]], allowed_subject_oids: set[ObjectIdentifier]
) -> None:
"""Helper method to validate subject attributes."""
for attr in subject_attributes:
if attr.oid not in allowed_subject_oids:
oid_name = getattr(attr.oid, 'name', None) or attr.oid.dotted_string
error_message = f'Unsupported subject attribute: {oid_name}'
raise ValueError(error_message)
[docs]
def issue_credential(
self, cert_template_str: str, device: DeviceModel, domain: DomainModel, credential_request: CredentialRequest
) -> IssuedCredentialModel | None:
"""Issues a credential based on the specified certificate template and CSR.
This method handles the credential issuance process, which includes extracting
the necessary details from the CSR and domain, and then issuing the requested
certificate. The method supports both new certificate issuance and reenrollment.
Args:
cert_template_str (str): The certificate template string indicating the type
of certificate to issue (e.g., 'tls-server', 'tls-client', etc.).
device (DeviceModel): The device for which the certificate is being issued.
domain (DomainModel): The domain associated with the certificate issuance.
credential_request (CredentialRequest): A CredentialRequest object containing processed information
about the CSR
Returns:
IssuedCredentialModel: The issued credential model that contains the issued certificate and related data.
Raises:
ValueError: If the certificate template is invalid or any other error occurs during issuance.
"""
if cert_template_str not in self.cert_template_classes:
error_message = f'Unknown certificate template type: {cert_template_str}'
raise ValueError(error_message)
return self._issue_based_on_template(
cert_template_str=cert_template_str, credential_request=credential_request, device=device, domain=domain
)
[docs]
def _issue_simpleenroll(
self,
device: DeviceModel,
domain: DomainModel,
requested_cert_template_str: str,
credential_request: CredentialRequest,
) -> LoggedHttpResponse:
"""Handles the credential issuance and raises an error if issuance fails."""
try:
issued_credential: IssuedCredentialModel | None = self.issue_credential(
cert_template_str=requested_cert_template_str,
device=device,
domain=domain,
credential_request=credential_request,
)
except ValueError as e:
error_message = f'Error while issuing credential ({type(e).__name__}): {e!s}'
return LoggedHttpResponse(content=error_message, status=400)
except Exception as e: # noqa: BLE001
error_message = f'Error while issuing credential ({type(e).__name__}): {e!s}'
return LoggedHttpResponse(content=error_message, status=400)
if issued_credential is None:
return LoggedHttpResponse('Credential cannot be found', 400)
encoding = Encoding.DER if credential_request.request_format in {'der', 'base64_der'} else Encoding.PEM
cert_bytes = issued_credential.credential.get_certificate().public_bytes(encoding=encoding)
if credential_request.request_format == 'base64_der':
b64_cert = base64.b64encode(cert_bytes).decode('utf-8')
cert = '\n'.join([b64_cert[i:i + 64] for i in range(0, len(b64_cert), 64)])
content_type = 'application/pkix-cert'
elif credential_request.request_format == 'der':
cert = cert_bytes
content_type = 'application/pkix-cert'
else:
try:
cert = cert_bytes.decode('utf-8')
except UnicodeDecodeError:
cert = cert_bytes
content_type = 'application/x-pem-file'
if device.onboarding_config and requested_cert_template_str == 'domaincredential':
device.onboarding_config.onboarding_status = OnboardingStatus.ONBOARDED
device.save()
return LoggedHttpResponse(content=cert, status=200, content_type=content_type)
[docs]
def _issue_based_on_template(
self, cert_template_str: str, credential_request: CredentialRequest, device: DeviceModel, domain: DomainModel
) -> IssuedCredentialModel | None:
"""Issues the credential based on the selected template."""
if cert_template_str == 'domaincredential':
domain_credential = LocalDomainCredentialIssuer(device=device, domain=domain)
return domain_credential.issue_domain_credential_certificate(public_key=credential_request.public_key)
if cert_template_str == 'tls-client':
tls_client_credential = LocalTlsClientCredentialIssuer(device=device, domain=domain)
return tls_client_credential.issue_tls_client_certificate(
common_name=credential_request.common_name, public_key=credential_request.public_key, validity_days=365
)
if cert_template_str == 'tls-server':
tls_server_credential = LocalTlsServerCredentialIssuer(device=device, domain=domain)
return tls_server_credential.issue_tls_server_certificate(
common_name=credential_request.common_name,
ipv4_addresses=credential_request.ipv4_addresses,
ipv6_addresses=credential_request.ipv6_addresses,
domain_names=credential_request.dns_names,
san_critical=False,
public_key=credential_request.public_key,
validity_days=365,
)
if cert_template_str == 'opc-ua-client':
opc_ua_client_credential = OpcUaClientCredentialIssuer(device=device, domain=domain)
return opc_ua_client_credential.issue_opc_ua_client_certificate(
common_name=credential_request.common_name,
public_key=credential_request.public_key,
validity_days=365,
application_uri=credential_request.uniform_resource_identifiers
)
if cert_template_str == 'opc-ua-server':
opc_ua_server_credential = OpcUaServerCredentialIssuer(device=device, domain=domain)
return opc_ua_server_credential.issue_opc_ua_server_certificate(
common_name=credential_request.common_name,
ipv4_addresses=credential_request.ipv4_addresses,
ipv6_addresses=credential_request.ipv6_addresses,
domain_names=credential_request.dns_names,
public_key=credential_request.public_key,
validity_days=365,
application_uri=credential_request.uniform_resource_identifiers
)
return None
[docs]
class OnboardingMixin(LoggedHttpResponse):
"""A mixin that provides onboarding validation logic for issuing credentials."""
[docs]
def _validate_onboarding(
self, device: DeviceModel, credential_request: CredentialRequest, requested_cert_template_str: str
) -> LoggedHttpResponse | None:
"""Validates if the device's onboarding status is appropriate for credential issuance."""
try:
issued_credential = IssuedCredentialModel.objects.get(
device=device, common_name=credential_request.common_name
)
except IssuedCredentialModel.DoesNotExist:
issued_credential = None
if issued_credential:
return LoggedHttpResponse(
'A credential with the same CN already exists. Not allowed for method /simpleenroll', status=422
)
if requested_cert_template_str == 'domaincredential' and not device.onboarding_config:
return LoggedHttpResponse(
'Requested domain credential for device which does not require onboarding.', status=422
)
return None
@method_decorator(csrf_exempt, name='dispatch')
[docs]
class EstSimpleEnrollmentView(
EstAuthenticationMixin,
EstHttpMixin,
EstRequestedDomainExtractorMixin,
EstRequestedCertTemplateExtractorMixin,
EstPkiMessageSerializerMixin,
DeviceHandlerMixin,
CredentialIssuanceMixin,
OnboardingMixin,
LoggerMixin,
View,
):
"""Handles simple EST (Enrollment over Secure Transport) enrollment requests.
This view processes certificate signing requests (CSRs), authenticates the client using
either Mutual TLS or username/password, validates the device, and issues the requested certificate
based on the certificate template specified in the request.
"""
[docs]
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> LoggedHttpResponse:
"""Handle POST requests for simple enrollment."""
self.logger.info('Request received: method=%s path=%s', request.method, request.path)
del args
credential_request = None
device: DeviceModel | None = None
requested_domain: DomainModel | None = None
requested_cert_template_str: str | None = None
raw_message, http_response = self.process_http_request(request)
if not http_response and raw_message:
domain_name = cast('str', kwargs.get('domain'))
requested_domain, http_response = self.extract_requested_domain(domain_name=domain_name)
if not http_response and raw_message and requested_domain:
cert_template = cast('str', kwargs.get('certtemplate'))
requested_cert_template_str, http_response = self.extract_cert_template(cert_template=cert_template)
if (not http_response and
raw_message and
requested_domain and
requested_cert_template_str):
device, http_response = self.authenticate_request(
request=self.request,
domain=requested_domain,
cert_template_str=requested_cert_template_str,
)
if not http_response:
credential_request, _csr, http_response = self.deserialize_pki_message(self.raw_message)
if not http_response and credential_request and requested_domain and requested_cert_template_str:
if not device:
device = self.create_device_idevid(
credential_request=credential_request,
domain=requested_domain,
cert_template=cert_template
)
if not device:
http_response = LoggedHttpResponse('Device not found and failed to create a new one.', status=500)
if not http_response and credential_request and device and requested_cert_template_str:
http_response = self._validate_onboarding(device=device,
credential_request=credential_request,
requested_cert_template_str=requested_cert_template_str)
if not http_response and credential_request and device and requested_domain and requested_cert_template_str:
http_response = self._issue_simpleenroll(device=device,
domain=requested_domain,
credential_request=credential_request,
requested_cert_template_str=requested_cert_template_str)
if not http_response:
http_response = LoggedHttpResponse('Something went wrong during EST simpleenroll.', status=500)
return http_response
@method_decorator(csrf_exempt, name='dispatch')
[docs]
class EstSimpleReEnrollmentView(EstAuthenticationMixin,
EstHttpMixin,
EstRequestedDomainExtractorMixin,
EstRequestedCertTemplateExtractorMixin,
EstPkiMessageSerializerMixin,
DeviceHandlerMixin,
CredentialIssuanceMixin,
OnboardingMixin,
LoggerMixin,
View):
"""Handles simple EST (Enrollment over Secure Transport) reenrollment requests.
This view processes certificate signing requests (CSRs), authenticates the client using
either Mutual TLS or username/password, validates the device, and issues the requested certificate
based on the certificate template specified in the request.
"""
[docs]
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> LoggedHttpResponse:
"""Handle POST requests for simple enrollment."""
self.logger.info('Request received: method=%s path=%s', request.method, request.path)
del args
credential_request = None
device: DeviceModel | None = None
requested_domain: DomainModel | None = None
requested_cert_template_str: str | None = None
raw_message, http_response = self.process_http_request(request)
if not http_response and raw_message:
domain_name = cast('str', kwargs.get('domain'))
requested_domain, http_response = self.extract_requested_domain(domain_name=domain_name)
if not http_response and raw_message and requested_domain:
cert_template = cast('str', kwargs.get('certtemplate'))
requested_cert_template_str, http_response = self.extract_cert_template(cert_template=cert_template)
if not http_response:
credential_request, csr, http_response = self.deserialize_pki_message(self.raw_message)
if (not http_response and
csr and
requested_domain and
requested_cert_template_str):
device, http_response = self.authenticate_request(
request=self.request,
domain=requested_domain,
cert_template_str=requested_cert_template_str,
csr=csr
)
if not http_response and credential_request and device and requested_domain and requested_cert_template_str:
http_response = self._issue_simpleenroll(device=device,
domain=requested_domain,
credential_request=credential_request,
requested_cert_template_str=requested_cert_template_str)
if not http_response:
http_response = LoggedHttpResponse('Something went wrong during EST simplereenroll.', status=500)
return http_response
@method_decorator(csrf_exempt, name='dispatch')
[docs]
class EstCACertsView(EstAuthenticationMixin, EstRequestedDomainExtractorMixin, View, LoggerMixin):
"""View to handle the EST /cacerts endpoint.
Returns the CA certificate chain in a (simplified) PKCS#7 MIME format.
URL pattern should supply the 'domain' parameter (e.g., /cacerts/<domain>/)
"""
[docs]
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> LoggedHttpResponse:
"""Handle GET requests for the /cacerts endpoint.
This method retrieves the CA certificate chain and returns it in PKCS#7 MIME format.
"""
self.logger.info('Request received: method=%s path=%s', request.method, request.path)
del request, args
requested_domain: DomainModel | None
try:
domain_name = cast('str', kwargs.get('domain'))
requested_domain, http_response = self.extract_requested_domain(domain_name=domain_name)
if not http_response and requested_domain:
if not requested_domain.issuing_ca:
return LoggedHttpResponse('The requested domain has no issuang ca configured', status=500)
ca_credential_serializer = requested_domain.issuing_ca.credential.get_credential_serializer()
pkcs7_certs = ca_credential_serializer.get_full_chain_as_serializer().as_pkcs7_der()
b64_pkcs7 = base64.b64encode(pkcs7_certs).decode()
formatted_b64_pkcs7 = '\n'.join([b64_pkcs7[i:i + 64] for i in range(0, len(b64_pkcs7), 64)])
http_response = LoggedHttpResponse(
formatted_b64_pkcs7.encode(),
status=200,
content_type='application/pkcs7-mime',
headers={'Content-Transfer-Encoding': 'base64'}
)
if 'Vary' in http_response:
del http_response['Vary']
if 'Content-Language' in http_response:
del http_response['Content-Language']
if not http_response:
http_response = LoggedHttpResponse('Something went wrong during EST getcacerts.', status=500)
except Exception as e: # noqa:BLE001
return LoggedHttpResponse(
f'Error retrieving CA certificates: {e!s}', status=500
)
else:
return http_response
@method_decorator(csrf_exempt, name='dispatch')
[docs]
class EstCsrAttrsView(View, LoggerMixin):
"""View to handle the EST /csrattrs endpoint.
This endpoint is not supported and returns 404 Not Found.
"""
[docs]
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponseBase:
"""Handle GET requests for the /csrattrs endpoint."""
self.logger.info('Request received: method=%s path=%s', request.method, request.path)
del request, args, kwargs
return LoggedHttpResponse(
'csrattrs/ is not supported', status=404
)