"""This module contains the CMP endpoints (views)."""
from __future__ import annotations
import datetime
import enum
import ipaddress
import secrets
from typing import TYPE_CHECKING, Protocol, cast, get_args
from aoki.views import AokiServiceMixin
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.hazmat.primitives.serialization import Encoding, load_der_public_key, load_pem_private_key
from cryptography.x509.oid import ExtensionOID
from devices.issuer import (
LocalDomainCredentialIssuer,
LocalTlsClientCredentialIssuer,
LocalTlsServerCredentialIssuer,
OpcUaClientCredentialIssuer,
OpcUaServerCredentialIssuer,
)
from devices.models import (
DeviceModel,
IssuedCredentialModel,
NoOnboardingPkiProtocol,
OnboardingPkiProtocol,
OnboardingProtocol,
OnboardingStatus,
)
from django.http import HttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import View
from pki.models.domain import DomainModel
from pki.util.idevid import IDevIDAuthenticator
from pyasn1.codec.der import decoder, encoder # type: ignore[import-untyped]
from pyasn1.type import tag, univ, useful # type: ignore[import-untyped]
from pyasn1_modules import rfc2459, rfc2511, rfc4210 # type: ignore[import-untyped]
from trustpoint_core.crypto_types import PublicKey
from trustpoint_core.oid import AlgorithmIdentifier, HashAlgorithm, HmacAlgorithm, SignatureSuite
from cmp.util import NameParser
if TYPE_CHECKING:
from typing import Any, TypeGuard
from cryptography.hazmat.primitives.asymmetric.types import PublicKeyTypes
from devices.models import NoOnboardingConfigModel, OnboardingConfigModel
from django.http import HttpRequest
from pki.models.credential import CredentialModel
[docs]
UTC_TIME_THRESHOLD = 2050
[docs]
UTC_TIME_CORRECTION = 100
[docs]
CERT_TEMPLATE_VERSION = 2
[docs]
DEFAULT_VALIDITY_DAYS = 10
[docs]
CMP_MESSAGE_VERSION = 2
[docs]
SENDER_NONCE_LENGTH = 16
[docs]
TRANSACTION_ID_LENGTH = 16
[docs]
def is_supported_public_key(public_key: PublicKeyTypes) -> TypeGuard[PublicKey]:
"""TypeGuard function that narrows down the public key type.
Args:
public_key: The loaded public key to check if it is supported.
Returns:
True if it is supported, False otherwise.
"""
return isinstance(public_key, get_args(PublicKey))
[docs]
def load_supported_public_key_type(der_bytes: bytes) -> PublicKey:
"""Tries to load the public key from bytes and checks if it is a supported key.
Args:
der_bytes: The bytes containing the key.
Raises:
ValueError: If loading of the public key failed.
TypeError: If the loaded public key is of an unsupported type.
Returns:
The loaded public key.
"""
try:
loaded_key = load_der_public_key(der_bytes)
except Exception as exception:
err_msg = 'Failed to load private key.'
raise ValueError(err_msg) from exception
if not is_supported_public_key(loaded_key):
err_msg = f'Key of type {type(loaded_key)} found, but expected one of {PublicKey}.'
raise TypeError(err_msg)
return loaded_key
[docs]
class ApplicationCertificateTemplateNames(enum.Enum):
"""Application Certificate Template."""
[docs]
TLS_CLIENT = 'tls-client'
[docs]
TLS_SERVER = 'tls-server'
[docs]
OPCUA_SERVER = 'opc-ua-server'
[docs]
OPCUA_CLIENT = 'opc-ua-client'
[docs]
IMPLICIT_CONFIRM_OID = '1.3.6.1.5.5.7.4.13'
[docs]
IMPLICIT_CONFIRM_STR_VALUE = '0x0500'
[docs]
class Dispatchable(Protocol):
"""Dispatchable Protocol."""
[docs]
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Dispatch method."""
...
[docs]
class CmpHttpMixin:
"""CMP Http Validations."""
[docs]
expected_content_type = 'application/pkixcmp'
[docs]
max_payload_size = 131072 # max 128 KiByte
[docs]
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Dispatch method."""
self.raw_message = request.read()
if len(self.raw_message) > self.max_payload_size:
return HttpResponse('Message is too large.', status=413)
content_type = request.headers.get('Content-Type')
if content_type is None:
return HttpResponse('Message is missing the content type.', status=415)
if content_type != self.expected_content_type:
return HttpResponse(
f'Message does not have the expected content type: {self.expected_content_type}.', status=415
)
parent = cast('Dispatchable', super())
return parent.dispatch(request, *args, **kwargs)
[docs]
class CmpRequestedDomainExtractorMixin:
"""Domain name extractor."""
[docs]
requested_domain: DomainModel
[docs]
is_aoki: bool = False
[docs]
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Dispatch method."""
domain_name = cast('str', kwargs.get('domain'))
if domain_name == '.aoki' and '/initialization/.aoki' in request.path: # basing this on URL is hacky
self.is_aoki = True
else:
try:
self.requested_domain = DomainModel.objects.get(unique_name=domain_name)
except DomainModel.DoesNotExist:
return HttpResponse('Domain does not exist.', status=404)
parent = cast('Dispatchable', super())
return parent.dispatch(request, *args, **kwargs)
[docs]
class CmpPkiMessageSerializerMixin:
"""CMP message serialization."""
[docs]
serialized_pyasn1_message: None | rfc4210.PKIMessage
[docs]
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Dispatch method."""
try:
self.serialized_pyasn1_message, _ = decoder.decode(self.raw_message, asn1Spec=rfc4210.PKIMessage())
except (ValueError, TypeError):
return HttpResponse('Failed to parse the CMP message. Seems to be corrupted.', status=400)
parent = cast('Dispatchable', super())
return parent.dispatch(request, *args, **kwargs)
[docs]
class CmpResponseBuilderMixin:
"""Mixin for CMP response message building shared between request types."""
@staticmethod
[docs]
def _parse_san_extension(cert_req_template: rfc2511.CertTemplate) -> dict[str, Any]:
"""Parses the (mandatory) SAN extension from the certificate request template.
Returns a dictionary with the following keys:
- 'dns_names': List of DNS/domain names.
- 'ipv4_addresses': List of IPv4 addresses.
- 'ipv6_addresses': List of IPv6 addresses.
- 'uris': List of URIs.
- 'san_critical': Boolean indicating if the SAN extension is critical.
"""
if not cert_req_template['extensions'].hasValue():
exc_msg = 'No extensions found in the request template.'
raise ValueError(exc_msg)
san_extensions = [
extension
for extension in cert_req_template['extensions']
if str(extension['extnID']) == ExtensionOID.SUBJECT_ALTERNATIVE_NAME.dotted_string
]
if len(san_extensions) != 1:
exc_msg = 'Exactly one SAN extension must be present in the request.'
raise ValueError(exc_msg)
san_extension = san_extensions[0]
san_critical = str(san_extension['critical']) == 'True'
san_extension_bytes = bytes(san_extension['extnValue'])
san_asn1, _ = decoder.decode(san_extension_bytes, asn1Spec=rfc2459.SubjectAltName())
dns_names = []
ipv4_addresses = []
ipv6_addresses = []
uris = []
for general_name in san_asn1:
name_type = general_name.getName()
value = general_name.getComponent()
if name_type == 'iPAddress':
try:
ipv4_addresses.append(ipaddress.IPv4Address(value.asOctets()))
except (ValueError, TypeError):
ipv6_addresses.append(ipaddress.IPv6Address(value.asOctets()))
elif name_type == 'dNSName':
dns_names.append(str(value))
elif name_type == 'uniformResourceIdentifier':
uris.append(str(value))
return {
'dns_names': dns_names,
'ipv4_addresses': ipv4_addresses,
'ipv6_addresses': ipv6_addresses,
'uris': uris,
'san_critical': san_critical,
}
@staticmethod
[docs]
def _issue_application_credential(
cert_req_template: rfc2511.CertReq,
public_key: PublicKey,
device: DeviceModel,
application_certificate_template: ApplicationCertificateTemplateNames
) -> IssuedCredentialModel:
"""Issues an application certificate for CMP CR."""
common_name = CmpRequestTemplateExtractorMixin.get_subject_common_name(cert_req_template)
domain = device.domain
if not domain:
err_msg = 'Device domain is not set.'
raise ValueError(err_msg)
# noinspection PyBroadException
try:
validity_not_before = convert_rfc2459_time(cert_req_template['validity']['notBefore'])
validity_not_after = convert_rfc2459_time(cert_req_template['validity']['notAfter'])
validity_in_days = (validity_not_after - validity_not_before).days
except Exception: # noqa: BLE001
validity_in_days = DEFAULT_VALIDITY_DAYS
if application_certificate_template == ApplicationCertificateTemplateNames.TLS_CLIENT:
issuer = LocalTlsClientCredentialIssuer(device=device, domain=domain)
return issuer.issue_tls_client_certificate(
common_name=common_name, validity_days=validity_in_days, public_key=public_key
)
# Below certificate templates require the SubjectAltName extension
san = CmpResponseBuilderMixin._parse_san_extension(cert_req_template)
if application_certificate_template == ApplicationCertificateTemplateNames.TLS_SERVER:
tls_server_issuer = LocalTlsServerCredentialIssuer(device=device, domain=domain)
return tls_server_issuer.issue_tls_server_certificate(
common_name=common_name,
validity_days=validity_in_days,
ipv4_addresses=san['ipv4_addresses'],
ipv6_addresses=san['ipv6_addresses'],
san_critical=san['san_critical'],
domain_names=san['dns_names'],
public_key=public_key,
)
# OPC UA
if (application_certificate_template in
[ApplicationCertificateTemplateNames.OPCUA_SERVER, ApplicationCertificateTemplateNames.OPCUA_CLIENT]):
application_uri = str(san['uris'][0]) if san['uris'] else None
if not application_uri:
err_msg = 'Missing OPC UA Application URI in SAN extension'
raise ValueError(err_msg)
if application_certificate_template == ApplicationCertificateTemplateNames.OPCUA_SERVER:
opc_ua_server_cred_issuer = OpcUaServerCredentialIssuer(device=device, domain=domain)
return opc_ua_server_cred_issuer.issue_opc_ua_server_certificate(
common_name=common_name,
application_uri=application_uri,
ipv4_addresses=san['ipv4_addresses'],
ipv6_addresses=san['ipv6_addresses'],
# TODO (FHKatCSW): san_critical not supported in OpcUaServerCredentialIssuer # noqa: FIX002
#san_critical=san['san_critical'], # noqa: ERA001
domain_names=san['dns_names'],
validity_days=validity_in_days,
public_key=public_key,
)
if application_certificate_template == ApplicationCertificateTemplateNames.OPCUA_CLIENT:
opc_ua_client_cred_issuer = OpcUaClientCredentialIssuer(device=device, domain=domain)
return opc_ua_client_cred_issuer.issue_opc_ua_client_certificate(
common_name=common_name,
application_uri=application_uri,
# TODO (FHKatCSW): san_critical not supported in OpcUaClientCredentialIssuer # noqa: FIX002
#san_critical=san['san_critical'], # noqa: ERA001
validity_days=validity_in_days,
public_key=public_key,
)
exc_msg = f'The app cert template {application_certificate_template} is not supported.'
raise ValueError(exc_msg)
@staticmethod
@staticmethod
[docs]
def _add_protection_shared_secret(
pki_message: rfc4210.PKIMessage, hmac_gen: hmac.HMAC,
) -> rfc4210.PKIMessage:
"""Adds HMAC-based shared-secret protection to the base PKI message."""
# TODO(AlexHx8472): Use fresh salt! # noqa: FIX002
encoded_protected_part = get_encoded_protected_part(pki_message)
hmac_gen.update(encoded_protected_part)
hmac_digest = hmac_gen.finalize()
binary_stuff = bin(int.from_bytes(hmac_digest, byteorder='big'))[2:].zfill(160)
pki_message['protection'] = rfc4210.PKIProtection(univ.BitString(binary_stuff)).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)
)
return pki_message
[docs]
def _sign_pki_message(
self, pki_message: rfc4210.PKIMessage, signer_credential: CredentialModel
) -> rfc4210.PKIMessage:
"""Applies signature-based protection to the base PKI message."""
encoded_protected_part = get_encoded_protected_part(pki_message)
private_key = load_pem_private_key(
signer_credential.private_key.encode(), password=None
)
signature_suite = SignatureSuite.from_certificate(signer_credential.get_certificate())
hash_algorithm = signature_suite.algorithm_identifier.hash_algorithm
if hash_algorithm is None:
err_msg = 'Failed to get the corresponding hash algorithm.'
raise ValueError(err_msg)
if isinstance(private_key, rsa.RSAPrivateKey):
signature = private_key.sign(
encoded_protected_part,
padding.PKCS1v15(),
hash_algorithm.hash_algorithm(),
)
elif isinstance(private_key, ec.EllipticCurvePrivateKey):
signature = private_key.sign(
encoded_protected_part,
ec.ECDSA(hash_algorithm.hash_algorithm()),
)
else:
exc_msg = 'Cannot sign due to unsupported private key type.'
raise TypeError(exc_msg)
pki_message['protection'] = rfc4210.PKIProtection(univ.BitString.fromOctetString(signature)).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)
)
return pki_message
[docs]
def get_encoded_protected_part(cmp_message: rfc4210.PKIMessage) -> Any: # bytes?
"""Encode the protected part of the CMP message."""
protected_part = rfc4210.ProtectedPart()
protected_part['header'] = cmp_message['header']
protected_part['infoValue'] = cmp_message['body']
return encoder.encode(protected_part)
@method_decorator(csrf_exempt, name='dispatch')
[docs]
class CmpInitializationRequestView(
CmpHttpMixin,
CmpRequestedDomainExtractorMixin,
CmpPkiMessageSerializerMixin,
CmpRequestTemplateExtractorMixin,
CmpResponseBuilderMixin,
View
):
"""Handles CMP Initialization Request Messages."""
[docs]
http_method_names = ('post',)
[docs]
serialized_pyasn1_message: rfc4210.PKIMessage
[docs]
requested_domain: DomainModel
[docs]
device: None | DeviceModel = None
[docs]
def _extract_ir_body(self) -> rfc4210.PKIBody:
message_body_name = self.serialized_pyasn1_message['body'].getName()
if message_body_name != 'ir':
err_msg = f'Expected CMP IR body, but got CMP {message_body_name.upper()} body.'
raise ValueError(err_msg)
ir_body = self.serialized_pyasn1_message['body']['ir']
if len(ir_body) > 1:
err_msg = 'multiple CertReqMessages found for IR.'
raise ValueError(err_msg)
if len(ir_body) < 1:
err_msg = 'no CertReqMessages found for IR.'
raise ValueError(err_msg)
return ir_body
[docs]
def _build_base_ip_message(
self,
issued_cred: IssuedCredentialModel,
issuer_credential: CredentialModel,
sender_kid: rfc2459.KeyIdentifier,
signer_credential: CredentialModel | None = None,
) -> rfc4210.PKIMessage:
"""Builds the IP response message (without the protection)."""
ip_header = self._build_response_message_header(
serialized_pyasn1_message=self.serialized_pyasn1_message,
sender_kid=sender_kid,
issuer_credential=signer_credential if signer_credential else issuer_credential)
ip_extra_certs = univ.SequenceOf()
certificate_chain = [
issuer_credential.get_certificate(),
*issuer_credential.get_certificate_chain(),
]
if signer_credential and issuer_credential.pk != signer_credential.pk:
# Include both the DevOwnerID (signer) and the Issuer CA in extraCerts
signer_chain = [
signer_credential.get_certificate(),
*signer_credential.get_certificate_chain(),
]
certificate_chain = signer_chain + certificate_chain
for certificate in certificate_chain:
der_bytes = certificate.public_bytes(encoding=Encoding.DER)
asn1_certificate, _ = decoder.decode(der_bytes, asn1Spec=rfc4210.CMPCertificate())
ip_extra_certs.append(asn1_certificate)
ip_body = rfc4210.PKIBody()
ip_body['ip'] = rfc4210.CertRepMessage().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
ip_body['ip']['caPubs'] = univ.SequenceOf().subtype(
sizeSpec=rfc4210.constraint.ValueSizeConstraint(1, rfc4210.MAX),
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1),
)
# TODO(AlexHx8472): Add TLS Server Certificate Root CA # noqa: FIX002
cert_response = rfc4210.CertResponse()
cert_response['certReqId'] = 0
pki_status_info = rfc4210.PKIStatusInfo()
pki_status_info['status'] = 0
cert_response['status'] = pki_status_info
cmp_cert = rfc4210.CMPCertificate().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
encoded_cert = issued_cred.credential.get_certificate().public_bytes(encoding=Encoding.DER)
der_cert, _ = decoder.decode(encoded_cert, asn1Spec=rfc4210.CMPCertificate())
cmp_cert.setComponentByName('tbsCertificate', der_cert['tbsCertificate'])
cmp_cert.setComponentByName('signatureValue', der_cert['signatureValue'])
cmp_cert.setComponentByName('signatureAlgorithm', der_cert['signatureAlgorithm'])
cert_or_enc_cert = rfc4210.CertOrEncCert()
cert_or_enc_cert['certificate'] = cmp_cert
cert_response['certifiedKeyPair']['certOrEncCert'] = cert_or_enc_cert
ip_body['ip']['response'].append(cert_response)
ip_message = rfc4210.PKIMessage()
ip_message['header'] = ip_header
ip_message['body'] = ip_body
for extra_cert in ip_extra_certs:
ip_message['extraCerts'].append(extra_cert)
return ip_message
[docs]
def _handle_shared_secret_initialization_request(
self) -> HttpResponse:
"""Handles IR for initial certificate requests with shared secret protection."""
if TYPE_CHECKING: # mypy does not know we only get here if self.device is not None
assert self.device is not None
if self.device.domain != self.requested_domain:
err_msg = 'The device domain does not match the requested domain.'
raise ValueError(err_msg)
config: OnboardingConfigModel | NoOnboardingConfigModel | None = None
shared_secret = None
if self.device.onboarding_config:
config = self.device.onboarding_config
shared_secret = config.cmp_shared_secret
elif self.device.no_onboarding_config:
config = self.device.no_onboarding_config
shared_secret = config.cmp_shared_secret
else:
err_msg = 'Device is not configured for shared secret authentication.'
raise ValueError(err_msg)
if not shared_secret:
err_msg = 'Device is misconfigured: shared secret is missing or empty.'
raise ValueError(err_msg)
req_message_body = self._extract_ir_body()
cert_req_template = self._extract_cert_req_template(req_message_body)
loaded_public_key = self._load_cert_req_public_key(cert_req_template)
# TODO(AlexHx8472): verify popo / process popo: popo = req_message_body[0]['pop'].prettyPrint() # noqa: FIX002
hmac_gen = self._verify_protection_shared_secret(
serialized_pyasn1_message=self.serialized_pyasn1_message,
shared_secret=shared_secret,
)
# Checks regarding contained public key and corresponding signature suite of the issuing CA
issuing_ca_credential = self.requested_domain.get_issuing_ca_or_value_error().credential
issuing_ca_cert = issuing_ca_credential.get_certificate()
signature_suite = SignatureSuite.from_certificate(issuing_ca_cert)
if not signature_suite.public_key_matches_signature_suite(loaded_public_key):
err_msg = 'Contained public key type does not match the signature suite.'
raise ValueError(err_msg)
# Issue the credential
if self.application_certificate_template: # application credential request
issued_cred = self._issue_application_credential(
cert_req_template=cert_req_template,
public_key=loaded_public_key,
device=self.device,
application_certificate_template=self.application_certificate_template
)
else: # domain credential request
issuer_domain_credential = LocalDomainCredentialIssuer(device=self.device, domain=self.device.domain)
issued_cred = issuer_domain_credential.issue_domain_credential_certificate(
public_key=loaded_public_key
)
# Build the IP response message
sender_kid = self.serialized_pyasn1_message['header']['senderKID']
ip_message = self._build_base_ip_message(
issued_cred=issued_cred, sender_kid=sender_kid, issuer_credential=issuing_ca_credential
)
ip_message = self._add_protection_shared_secret(
pki_message=ip_message, hmac_gen=hmac_gen
)
encoded_ip_message = encoder.encode(ip_message)
_decoded_ip_message, _ = decoder.decode(encoded_ip_message, asn1Spec=rfc4210.PKIMessage())
return HttpResponse(encoded_ip_message, content_type='application/pkixcmp', status=200)
[docs]
def _handle_signature_based_initialization_request( # noqa: C901
self) -> HttpResponse:
"""Handles IR for initial certificate requests with signature-based protection."""
# different protection algorithm than password-based MAC - certificate-based protection
extra_certs = self.serialized_pyasn1_message['extraCerts']
if extra_certs is None or len(extra_certs) == 0:
err_msg = 'No extra certificates found in the PKIMessage.'
raise ValueError(err_msg)
cmp_signer_extra_cert = extra_certs[0]
der_cmp_signer_cert = encoder.encode(cmp_signer_extra_cert)
cmp_signer_cert = x509.load_der_x509_certificate(der_cmp_signer_cert)
loaded_extra_cert = None
intermediate_certs = []
for extra_cert in extra_certs[1:]:
der_extra_cert = encoder.encode(extra_cert)
loaded_extra_cert = x509.load_der_x509_certificate(der_extra_cert)
# Do not include self-signed certs
if loaded_extra_cert.subject.public_bytes() != loaded_extra_cert.issuer.public_bytes():
intermediate_certs.append(loaded_extra_cert)
if not cmp_signer_cert: # was 'loaded_extra_cert', does that make any sense?
err_msg = 'CMP signer certificate missing in extra certs.'
raise ValueError(err_msg)
self.device = IDevIDAuthenticator.authenticate_idevid_from_x509(
idevid_cert=cmp_signer_cert,
intermediate_cas=intermediate_certs,
domain=None if self.is_aoki else self.requested_domain,
onboarding_protocol=OnboardingProtocol.CMP_IDEVID,
pki_protocol=OnboardingPkiProtocol.CMP,
)
if not self.device.domain:
return HttpResponse('Device domain is not set.', status=422)
self.requested_domain = self.device.domain
# device sanity checks
if not self.device.onboarding_config:
return HttpResponse(
'The corresponding device is not configured to use the onboarding mechanism.', status=404
)
if self.device.onboarding_config.onboarding_protocol != OnboardingProtocol.CMP_IDEVID:
return HttpResponse('Wrong onboarding protocol.')
req_message_body = self._extract_ir_body()
cert_req_template = self._extract_cert_req_template(req_message_body)
# Ensure subject common name is present
_common_name = self.get_subject_common_name(cert_req_template)
loaded_public_key = self._load_cert_req_public_key(cert_req_template)
# TODO(AlexHx8472): verify popo / process popo: popo = req_message_body[0]['pop'].prettyPrint() # noqa: FIX002
self._verify_protection_signature(
serialized_pyasn1_message=self.serialized_pyasn1_message,
cmp_signer_cert=cmp_signer_cert
)
# Checks regarding contained public key and corresponding signature suite of the issuing CA
issuing_ca_credential = self.requested_domain.get_issuing_ca_or_value_error().credential
issuing_ca_cert = issuing_ca_credential.get_certificate()
signer_credential = issuing_ca_credential
if (self.is_aoki):
owner_credential = AokiServiceMixin.get_owner_credential(cmp_signer_cert)
if not owner_credential:
return HttpResponse('No DevOwnerID present for this IDevID.', status=403)
signer_credential = owner_credential
signature_suite = SignatureSuite.from_certificate(issuing_ca_cert)
if not signature_suite.public_key_matches_signature_suite(loaded_public_key):
err_msg = 'Contained public key type does not match the signature suite.'
raise ValueError(err_msg)
issuer_domain_credential = LocalDomainCredentialIssuer(device=self.device, domain=self.requested_domain)
issued_cred = issuer_domain_credential.issue_domain_credential_certificate(
public_key=loaded_public_key
)
# Build the response PKI message
ski = x509.SubjectKeyIdentifier.from_public_key(signer_credential.get_certificate().public_key())
sender_kid = rfc2459.KeyIdentifier(ski.digest).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
)
pki_message = self._build_base_ip_message(
issued_cred=issued_cred,
sender_kid=sender_kid,
issuer_credential=issuing_ca_credential,
signer_credential=signer_credential
)
pki_message = self._sign_pki_message(
pki_message=pki_message, signer_credential=signer_credential
)
encoded_message = encoder.encode(pki_message)
decoded_message, _ = decoder.decode(encoded_message, asn1Spec=rfc4210.PKIMessage())
self.device.onboarding_config.onboarding_status = OnboardingStatus.ONBOARDED
self.device.save()
return HttpResponse(encoded_message, content_type='application/pkixcmp', status=200)
[docs]
def post( # noqa: PLR0911
self,
request: HttpRequest,
*args: Any,
**kwargs: Any,
) -> HttpResponse:
"""Handles the POST requests to the CMP IR endpoint."""
del args, kwargs, request # request not accessed directly
self._check_header(serialized_pyasn1_message=self.serialized_pyasn1_message)
protection_algorithm = AlgorithmIdentifier.from_dotted_string(
self.serialized_pyasn1_message['header']['protectionAlg']['algorithm'].prettyPrint()
)
if protection_algorithm == AlgorithmIdentifier.PASSWORD_BASED_MAC:
if self.is_aoki:
return HttpResponse('AOKI only supported with signature-based protection (IDevID).', status=400)
try:
sender_kid = int(self.serialized_pyasn1_message['header']['senderKID'].prettyPrint())
self.device = DeviceModel.objects.get(pk=sender_kid)
except (DeviceModel.DoesNotExist, Exception):
return HttpResponse('Device not found.', status=404)
if (
self.device.no_onboarding_config
and self.device.no_onboarding_config.has_pki_protocol(NoOnboardingPkiProtocol.CMP_SHARED_SECRET)
):
if not self.application_certificate_template:
return HttpResponse('Missing application certificate template.', status=404)
return self._handle_shared_secret_initialization_request()
if (
self.device.onboarding_config
and self.device.onboarding_config.has_pki_protocol(OnboardingPkiProtocol.CMP)
):
if self.application_certificate_template:
return HttpResponse(
'Found application certificate template for domain credential certificate request.', status=404
)
return self._handle_shared_secret_initialization_request()
return HttpResponse('Invalid Request for corresponding device.', status=460)
return self._handle_signature_based_initialization_request()
@method_decorator(csrf_exempt, name='dispatch')
[docs]
class CmpCertificationRequestView(
CmpHttpMixin,
CmpRequestedDomainExtractorMixin,
CmpPkiMessageSerializerMixin,
CmpRequestTemplateExtractorMixin,
CmpResponseBuilderMixin,
View
):
"""Handles CMP Certification Request Messages."""
[docs]
http_method_names = ('post',)
[docs]
serialized_pyasn1_message: rfc4210.PKIMessage
[docs]
requested_domain: DomainModel
[docs]
application_certificate_template: None | ApplicationCertificateTemplateNames = None
[docs]
def _extract_cr_body(self) -> rfc4210.PKIBody:
message_body_name = self.serialized_pyasn1_message['body'].getName()
if message_body_name != 'cr':
err_msg = f'Expected CMP CR body, but got CMP {message_body_name.upper()} body.'
raise ValueError(err_msg)
cr_body = self.serialized_pyasn1_message['body']['cr']
if len(cr_body) > 1:
err_msg = 'multiple CertReqMessages found for CR.'
raise ValueError(err_msg)
if len(cr_body) < 1:
err_msg = 'no CertReqMessages found for CR.'
raise ValueError(err_msg)
return cr_body
[docs]
def _build_base_cp_message(
self,
issued_cred: IssuedCredentialModel,
issuer_credential: CredentialModel,
sender_kid: rfc2459.KeyIdentifier
) -> rfc4210.PKIMessage:
"""Builds the CR response message (without the protection)."""
cp_header = self._build_response_message_header(
serialized_pyasn1_message=self.serialized_pyasn1_message,
sender_kid=sender_kid,
issuer_credential=issuer_credential)
cp_extra_certs = univ.SequenceOf()
certificate_chain = [
issuer_credential.get_certificate(),
*issuer_credential.get_certificate_chain(),
]
for certificate in certificate_chain:
der_bytes = certificate.public_bytes(encoding=Encoding.DER)
asn1_certificate, _ = decoder.decode(der_bytes, asn1Spec=rfc4210.CMPCertificate())
cp_extra_certs.append(asn1_certificate)
cp_body = rfc4210.PKIBody()
cp_body['cp'] = rfc4210.CertRepMessage().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
cp_body['cp']['caPubs'] = univ.SequenceOf().subtype(
sizeSpec=rfc4210.constraint.ValueSizeConstraint(1, rfc4210.MAX),
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1),
)
# TODO(AlexHx8472): Add TLS Server Certificate Root CA # noqa: FIX002
cert_response = rfc4210.CertResponse()
cert_response['certReqId'] = 0
pki_status_info = rfc4210.PKIStatusInfo()
pki_status_info['status'] = 0
cert_response['status'] = pki_status_info
cmp_cert = rfc4210.CMPCertificate().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
encoded_cert = issued_cred.credential.get_certificate().public_bytes(encoding=Encoding.DER)
der_cert, _ = decoder.decode(encoded_cert, asn1Spec=rfc4210.CMPCertificate())
cmp_cert.setComponentByName('tbsCertificate', der_cert['tbsCertificate'])
cmp_cert.setComponentByName('signatureValue', der_cert['signatureValue'])
cmp_cert.setComponentByName('signatureAlgorithm', der_cert['signatureAlgorithm'])
cert_or_enc_cert = rfc4210.CertOrEncCert()
cert_or_enc_cert['certificate'] = cmp_cert
cert_response['certifiedKeyPair']['certOrEncCert'] = cert_or_enc_cert
cp_body['cp']['response'].append(cert_response)
cp_message = rfc4210.PKIMessage()
cp_message['header'] = cp_header
cp_message['body'] = cp_body
for extra_cert in cp_extra_certs:
cp_message['extraCerts'].append(extra_cert)
return cp_message
[docs]
def _handle_shared_secret_certificate_request(self) -> HttpResponse:
"""Handles CMP CR for application certificates with shared secret protection."""
if not self.application_certificate_template:
return HttpResponse('Missing application certificate template.', status=404)
try:
sender_kid = int(self.serialized_pyasn1_message['header']['senderKID'].prettyPrint())
self.device = DeviceModel.objects.get(pk=sender_kid)
except (DeviceModel.DoesNotExist, Exception):
return HttpResponse('Device not found.', status=404)
if not self.device.no_onboarding_config:
return HttpResponse(
'Password based MAC protected CMP messages while using CMP '
'CR message types are not allowed for onboarded devices. '
'Use signature based protection utilizing the domain credential',
status=422,
)
if not self.device.no_onboarding_config.has_pki_protocol(NoOnboardingPkiProtocol.CMP_SHARED_SECRET):
return HttpResponse(
'Received a password based MAC protected CMP message for a device that does not use the '
f'pki-protocol {NoOnboardingPkiProtocol.CMP_SHARED_SECRET.label}.',
status=422,
)
if self.device.domain != self.requested_domain:
exc_msg = 'The device domain does not match the requested domain.'
raise ValueError(exc_msg)
if self.device.no_onboarding_config.cmp_shared_secret == '':
err_msg = 'Device is misconfigured.'
raise ValueError(err_msg)
req_message_body = self._extract_cr_body()
cert_req_template = self._extract_cert_req_template(req_message_body)
# only local key-gen supported currently -> public key must be present
loaded_public_key = self._load_cert_req_public_key(cert_req_template)
# TODO(AlexHx8472): verify popo / process popo: popo = req_message_body[0]['pop'].prettyPrint() # noqa: FIX002
hmac_gen = self._verify_protection_shared_secret(
serialized_pyasn1_message=self.serialized_pyasn1_message,
shared_secret=self.device.no_onboarding_config.cmp_shared_secret,
)
# Checks regarding contained public key and corresponding signature suite of the issuing CA
issuing_ca_credential = self.requested_domain.get_issuing_ca_or_value_error().credential
issuing_ca_cert = issuing_ca_credential.get_certificate()
signature_suite = SignatureSuite.from_certificate(issuing_ca_cert)
if not signature_suite.public_key_matches_signature_suite(loaded_public_key):
err_msg = 'Contained public key type does not match the signature suite.'
raise ValueError(err_msg)
issued_app_cred = self._issue_application_credential(
cert_req_template=cert_req_template,
public_key=loaded_public_key,
device=self.device,
application_certificate_template=self.application_certificate_template
)
cp_message = self._build_base_cp_message(
issued_cred=issued_app_cred,
issuer_credential=issuing_ca_credential,
sender_kid=self.serialized_pyasn1_message['header']['senderKID']
)
cp_message = self._add_protection_shared_secret(
pki_message=cp_message, hmac_gen=hmac_gen,
)
encoded_cp_message = encoder.encode(cp_message)
decoded_cp_message, _ = decoder.decode(encoded_cp_message, asn1Spec=rfc4210.PKIMessage())
return HttpResponse(encoded_cp_message, content_type='application/pkixcmp', status=200)
[docs]
def _handle_signature_based_certificate_request( # noqa: PLR0912, PLR0915, C901
self) -> HttpResponse:
if not self.application_certificate_template:
return HttpResponse('Missing application certificate template.', status=404)
extra_certs = self.serialized_pyasn1_message['extraCerts']
if extra_certs is None or len(extra_certs) == 0:
err_msg = 'No extra certificates found in the PKIMessage.'
raise ValueError(err_msg)
cmp_signer_extra_cert = extra_certs[0]
der_cmp_signer_cert = encoder.encode(cmp_signer_extra_cert)
cmp_signer_cert = x509.load_der_x509_certificate(der_cmp_signer_cert)
device_id = int(cmp_signer_cert.subject.get_attributes_for_oid(x509.NameOID.USER_ID)[0].value)
device_serial_number = cmp_signer_cert.subject.get_attributes_for_oid(x509.NameOID.SERIAL_NUMBER)[0].value
domain_name = cmp_signer_cert.subject.get_attributes_for_oid(x509.NameOID.DOMAIN_COMPONENT)[0].value
common_name = cmp_signer_cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0]
if isinstance(common_name.value, str):
common_name_value = common_name.value
elif isinstance(common_name.value, bytes):
common_name_value = common_name.value.decode()
else:
err_msg = 'Failed to parse common name value'
raise TypeError(err_msg)
if common_name_value != LocalDomainCredentialIssuer.DOMAIN_CREDENTIAL_CN:
err_msg = 'Not a domain credential.'
raise ValueError(err_msg)
try:
self.device = DeviceModel.objects.get(pk=device_id)
except DeviceModel.DoesNotExist:
return HttpResponse('Device not found.', status=404)
if device_serial_number != self.device.serial_number:
err_msg = 'SN mismatch'
raise ValueError(err_msg)
if not self.device.domain:
err_msg = 'The device is not part of any domain.'
raise ValueError(err_msg)
if domain_name != self.device.domain.unique_name:
err_msg = 'Domain mismatch.'
raise ValueError(err_msg)
issuing_ca_credential = self.device.domain.get_issuing_ca_or_value_error().credential
issuing_ca_cert = issuing_ca_credential.get_certificate()
# verifies the domain credential signature
cmp_signer_cert.verify_directly_issued_by(issuing_ca_cert)
if not self.device.onboarding_config:
return HttpResponse(
'The corresponding device is not configured to use the onboarding mechanism.', status=404
)
if not self.device.onboarding_config.has_pki_protocol(OnboardingPkiProtocol.CMP):
return HttpResponse('PKI protocol CMP client certificate expected, but got something else.')
req_message_body = self._extract_cr_body()
cert_req_template = self._extract_cert_req_template(req_message_body)
loaded_public_key = self._load_cert_req_public_key(cert_req_template)
# TODO(AlexHx8472): verify popo / process popo: popo = req_message_body[0]['pop'].prettyPrint() # noqa: FIX002
self._verify_protection_signature(
serialized_pyasn1_message=self.serialized_pyasn1_message,
cmp_signer_cert=cmp_signer_cert,
)
# Checks regarding contained public key and corresponding signature suite of the issuing CA
signature_suite = SignatureSuite.from_certificate(issuing_ca_cert)
if not signature_suite.public_key_matches_signature_suite(loaded_public_key):
err_msg = 'Contained public key type does not match the signature suite.'
raise ValueError(err_msg)
issued_cred = self._issue_application_credential(
cert_req_template=cert_req_template,
public_key=loaded_public_key,
device=self.device,
application_certificate_template=self.application_certificate_template
)
# Build the response PKI message
ski = x509.SubjectKeyIdentifier.from_public_key(issuing_ca_cert.public_key())
sender_kid = rfc2459.KeyIdentifier(ski.digest).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
)
pki_message = self._build_base_cp_message(
issued_cred=issued_cred,
issuer_credential=issuing_ca_credential,
sender_kid=sender_kid
)
pki_message = self._sign_pki_message(
pki_message=pki_message, signer_credential=issuing_ca_credential)
encoded_message = encoder.encode(pki_message)
decoded_message, _ = decoder.decode(encoded_message, asn1Spec=rfc4210.PKIMessage())
return HttpResponse(encoded_message, content_type='application/pkixcmp', status=200)
[docs]
def post(
self,
request: HttpRequest,
*args: Any,
**kwargs: Any,
) -> HttpResponse:
"""Handles the POST requests to the CMP CR endpoint."""
del args, kwargs, request # request not accessed directly
self._check_header(serialized_pyasn1_message=self.serialized_pyasn1_message)
protection_algorithm = AlgorithmIdentifier.from_dotted_string(
self.serialized_pyasn1_message['header']['protectionAlg']['algorithm'].prettyPrint()
)
if protection_algorithm == AlgorithmIdentifier.PASSWORD_BASED_MAC:
return self._handle_shared_secret_certificate_request()
return self._handle_signature_based_certificate_request()
[docs]
def convert_rfc2459_time(time_obj: rfc2459.Time) -> datetime.datetime:
"""Convert a pyasn1_modules.rfc2459.Time object to a timezone-aware datetime (UTC).
The Time object is a CHOICE between:
- utcTime: YYMMDDHHMMSSZ
- generalizedTime: YYYYMMDDHHMMSSZ
Returns:
A datetime object in UTC.
Raises:
ValueError: If the time format is unexpected.
"""
time_field = time_obj.getName()
time_str = str(time_obj.getComponent())
if time_field == 'utcTime':
dt = datetime.datetime.strptime(time_str, '%y%m%d%H%M%SZ').astimezone(tz=datetime.UTC)
if dt.year >= UTC_TIME_THRESHOLD:
dt = dt.replace(year=dt.year - UTC_TIME_CORRECTION)
elif time_field == 'generalTime':
dt = datetime.datetime.strptime(time_str, '%Y%m%d%H%M%SZ').astimezone(tz=datetime.UTC)
else:
err_msg = f'Unexpected time field: {time_field}'
raise ValueError(err_msg)
return dt