Source code for pki.util.idevid

"""Classes for handling IDevID certificates according to IEEE 802.1AR."""

from __future__ import annotations

import re
import secrets
from typing import TYPE_CHECKING

from cryptography import x509
from cryptography.x509.verification import Criticality, ExtensionPolicy, PolicyBuilder, Store, VerificationError
from devices.models import (
    DeviceModel,
    OnboardingConfigModel,
    OnboardingPkiProtocol,
    OnboardingProtocol,
    OnboardingStatus,
)

from pki.models import DevIdRegistration, DomainModel, TruststoreModel
from pki.util.x509 import ApacheTLSClientCertExtractor
from trustpoint.logger import LoggerMixin

if TYPE_CHECKING:
    from django.http import HttpRequest


[docs] class IDevIDAuthenticationError(Exception): """Exception raised for IDevID authentication failures."""
[docs] class IDevIDExtensionPolicy: """Builder for IDevID extension policies.""" @staticmethod
[docs] def _idevid_base_policy() -> ExtensionPolicy: """Create an extension policy for all certificates in a IDevID PKI.""" policy = ExtensionPolicy.permit_all() # Require the presence of the Authority Key Identifier extension as per IEEE 802.1AR 8.10.1 return policy.require_present( x509.AuthorityKeyIdentifier, Criticality.AGNOSTIC, None )
@staticmethod
[docs] def idevid_ee_policy() -> ExtensionPolicy: """Create an extension policy for IDevID end-entity certificates.""" return IDevIDExtensionPolicy._idevid_base_policy()
@staticmethod
[docs] def idevid_ca_policy() -> ExtensionPolicy: """Create an extension policy for IDevID CA certificates.""" policy = IDevIDExtensionPolicy._idevid_base_policy() # Require the presence of the Subject Key Identifier extension as per IEEE 802.1AR 8.10.2 policy = policy.require_present( x509.SubjectKeyIdentifier, Criticality.AGNOSTIC, None ) # Require the presence of the Basic Constraints extension as per RFC 5280 # Note: There are conflicting requirements in RFC 5280 and IEEE 802.1AR 8.10 # with respect to CA Basic Constraints being critical return policy.require_present( x509.BasicConstraints, Criticality.CRITICAL, None )
[docs] class IDevIDVerifier(LoggerMixin): """Verifies IDevID certificates as used e.g. by EST with mutual TLS auth.""" @classmethod
[docs] def verify_idevid_against_truststore( cls, idevid_cert: x509.Certificate, intermediate_cas: list[x509.Certificate], truststore: TruststoreModel ) -> bool: """Verify the IDevID certificate against the provided truststore.""" # Need to check whether truststore has intended usage IDevID? if truststore.intended_usage != TruststoreModel.IntendedUsage.IDEVID: cls.logger.warning('Truststore %s is not intended for IDevID verification', truststore.unique_name) return False cls.logger.info('Verifying IDevID certificate against truststore %s', truststore.unique_name) certificates = truststore.get_certificate_collection_serializer().as_crypto() cls.logger.debug('Certificates in truststore: %s', certificates) store = Store(certificates) builder = PolicyBuilder().store(store) builder = builder.max_chain_depth(2) builder = builder.extension_policies( ca_policy=IDevIDExtensionPolicy.idevid_ca_policy(), ee_policy=IDevIDExtensionPolicy.idevid_ee_policy(), ) verifier = builder.build_client_verifier() try: _verified_client = verifier.verify(idevid_cert, intermediate_cas) except VerificationError as e: cls.logger.warning('IDevID verification failed for truststore %s: %s', truststore.unique_name, e) return False return True
[docs] class IDevIDAuthenticator(LoggerMixin): """Authenticates IDevID certificates as used e.g. by EST with mutual TLS auth.""" @staticmethod
[docs] def _get_matching_registrations(idevid_subj_sn: str, domain: DomainModel | None) -> list[DevIdRegistration]: """Get DevIdRegistration patters matching the given domain and serial number.""" domain_name = domain.unique_name if domain else 'Any' if domain: domain_registrations = DevIdRegistration.objects.filter(domain=domain) else: domain_registrations = DevIdRegistration.objects.all() if not domain_registrations.exists(): error_message = f'No registration patterns for requested domain {domain_name}.' raise IDevIDAuthenticationError(error_message) matching_registrations = [ r for r in domain_registrations if re.fullmatch(r.serial_number_pattern, idevid_subj_sn) ] if not matching_registrations: error_message = (f'No DevID registration pattern matching SN {idevid_subj_sn} ' f'for requested domain {domain_name}.') raise IDevIDAuthenticationError(error_message) return matching_registrations
@staticmethod
[docs] def _auto_create_device_from_idevid( idevid_cert: x509.Certificate, idevid_subj_sn: str, domain: DomainModel, pki_protocol: OnboardingPkiProtocol, onboarding_protocol: OnboardingProtocol, ) -> DeviceModel: """Auto-create a new DeviceModel from the IDevID certificate.""" try: cn_b = idevid_cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value common_name = cn_b.decode() if isinstance(cn_b, bytes) else cn_b except (ValueError, IndexError): common_name = 'AutoGenDevice' if DeviceModel.objects.filter(common_name=common_name, domain=domain).exists(): common_name += f'_{secrets.token_hex(4)}' onboarding_config_model = OnboardingConfigModel( onboarding_status=OnboardingStatus.PENDING, onboarding_protocol=onboarding_protocol, ) onboarding_config_model.set_pki_protocols([pki_protocol]) device = DeviceModel( serial_number=idevid_subj_sn, common_name=common_name, domain=domain, onboarding_config=onboarding_config_model ) onboarding_config_model.full_clean() onboarding_config_model.save() device.full_clean() device.save() return device
@staticmethod
[docs] def get_subject_serial_number(idevid_cert: x509.Certificate) -> str: """Get the serial number from the subject of the IDevID certificate.""" try: sn_b = idevid_cert.subject.get_attributes_for_oid(x509.NameOID.SERIAL_NUMBER)[0].value except (ValueError, IndexError) as e: # TODO(Air): Check if we want to add field to associate IDevID with device by fingerprint # noqa: FIX002 # This would however be incompatible with the current approach to Registration patterns # One option is to modify the registration pattern to allow matching certain Issuer DN fields instead of SN error_message = 'IDevID certificates without a serial number in the subject DN are not supported.' raise IDevIDAuthenticationError(error_message) from e return sn_b.decode() if isinstance(sn_b, bytes) else sn_b
@classmethod
[docs] def authenticate_idevid_from_x509_no_device( cls, idevid_cert: x509.Certificate, intermediate_cas: list[x509.Certificate], domain: DomainModel | None = None ) -> tuple[DomainModel, str]: """Authenticate client using an IDevID certificate.""" idevid_subj_sn = cls.get_subject_serial_number(idevid_cert) matching_registrations = cls._get_matching_registrations(idevid_subj_sn, domain) # verify IDevID against Truststore for registration in matching_registrations: if (IDevIDVerifier.verify_idevid_against_truststore( idevid_cert=idevid_cert, intermediate_cas=intermediate_cas, truststore=registration.truststore, )): cls.logger.info( 'IDevID certificate with SN %s successfully verified against truststore %s', idevid_subj_sn, registration.truststore.unique_name ) return (registration.domain, idevid_subj_sn) error_message = (f'IDevID with SN {idevid_subj_sn} could not be verified against any truststore.') cls.logger.warning(error_message) raise IDevIDAuthenticationError(error_message)
@classmethod
[docs] def authenticate_idevid_from_x509( cls, idevid_cert: x509.Certificate, intermediate_cas: list[x509.Certificate], domain: DomainModel | None = None, onboarding_protocol: OnboardingProtocol = OnboardingProtocol.EST_IDEVID, pki_protocol: OnboardingPkiProtocol = OnboardingPkiProtocol.EST, ) -> DeviceModel: """Authenticate client using IDevID certificate for Domain Credential request and create a device.""" domain, idevid_subj_sn = cls.authenticate_idevid_from_x509_no_device( idevid_cert=idevid_cert, intermediate_cas=intermediate_cas, domain=domain ) # Check if we have a device with the same serial number existing_device = None try: existing_device = DeviceModel.objects.get( domain=domain, serial_number=idevid_subj_sn, ) except DeviceModel.DoesNotExist: pass except DeviceModel.MultipleObjectsReturned: error_message = (f'Multiple devices with the same serial number {idevid_subj_sn} ' f'found in domain {domain.unique_name}.') cls.logger.warning(error_message) cls.logger.warning('Auto-creating new device.') if existing_device: return existing_device return cls._auto_create_device_from_idevid( idevid_cert=idevid_cert, idevid_subj_sn=idevid_subj_sn, domain=domain, onboarding_protocol=onboarding_protocol, pki_protocol=pki_protocol )
@classmethod
[docs] def authenticate_idevid(cls, request: HttpRequest, domain: DomainModel | None = None) -> DeviceModel: """Authenticate client using IDevID certificate for Domain Credential request.""" idevid_cert, intermediate_cas = ApacheTLSClientCertExtractor.get_client_cert_as_x509(request) return cls.authenticate_idevid_from_x509( idevid_cert=idevid_cert, intermediate_cas=intermediate_cas, domain=domain )