"""AOKI Client for testing AOKI via CMP.
Please run from /rootdir/trustpoint with "uv run -m aoki.tests.cmp_client" for paths and imports to work.
This only works if your system OpenSSL version is 3.x.
"""
from __future__ import annotations
import logging
import subprocess
from pathlib import Path
from cryptography import x509
from cryptography.hazmat.primitives import hashes
[docs]
log = logging.getLogger('aoki.client')
# ruff: noqa: ERA001, T201
[docs]
CURRENT_DIR = Path(__file__).parent.resolve()
[docs]
CERTS_DIR = (CURRENT_DIR / './certs/').resolve()
[docs]
class AokiClientOwnerIdCertVerificationError(Exception):
"""Exception raised when the provided Owner ID certificate is invalid or not corresponding to the IDevID."""
[docs]
class AokiClientCertLoadError(Exception):
"""Exception raised when a certificate could not be loaded from the provided path."""
[docs]
class AokiCmpClient:
"""AOKI-CMP Client for testing purposes."""
[docs]
idevid_subj_sn : str = '_'
@staticmethod
[docs]
def _load_certificate(cert_path: Path) -> x509.Certificate:
try:
with cert_path.open('rb') as cert_file:
return x509.load_pem_x509_certificate(cert_file.read())
except FileNotFoundError as e:
exc_msg = f'Certificate file not found: {cert_path}'
raise AokiClientCertLoadError(exc_msg) from e
except ValueError as e:
exc_msg = f'Could not parse PEM format in certificate: {cert_path}'
raise AokiClientCertLoadError(exc_msg) from e
@staticmethod
[docs]
def _load_certificates(cert_path: Path) -> list[x509.Certificate]:
try:
with cert_path.open('rb') as cert_file:
return x509.load_pem_x509_certificates(cert_file.read())
except FileNotFoundError as e:
exc_msg = f'Certificate file not found: {cert_path}'
raise AokiClientCertLoadError(exc_msg) from e
except ValueError as e:
exc_msg = f'Could not parse PEM format in certificates: {cert_path}'
raise AokiClientCertLoadError(exc_msg) from e
[docs]
def _get_idevid_owner_san_uri(self, idevid_cert: x509.Certificate) -> str:
"""Get the Owner ID SAN URI corresponding to a IDevID certificate.
Formatted as "dev-owner:<idevid_subj_sn>.<idevid_x509_sn>.<idevid_sha256_fingerprint>"
"""
try:
sn_b = idevid_cert.subject.get_attributes_for_oid(x509.NameOID.SERIAL_NUMBER)[0].value
idevid_subj_sn = sn_b.decode() if isinstance(sn_b, bytes) else sn_b
except (ValueError, IndexError):
idevid_subj_sn = '_'
self.idevid_subj_sn = idevid_subj_sn
idevid_x509_sn = hex(idevid_cert.serial_number)[2:].zfill(16)
idevid_sha256_fingerprint = idevid_cert.fingerprint(hashes.SHA256()).hex()
return f'dev-owner:{idevid_subj_sn}.{idevid_x509_sn}.{idevid_sha256_fingerprint}'
[docs]
def _verify_matches_idevid_cert(self, owner_id_cert: x509.Certificate, idevid_cert: x509.Certificate) -> None:
"""Verify the Owner ID certificate is valid for the device IDevID."""
log.info('Verifying Owner ID certificate matches IDevID certificate')
idevid_san_uri = self._get_idevid_owner_san_uri(idevid_cert)
log.info('IDevID SAN URI: %s', idevid_san_uri)
for san in owner_id_cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value:
if isinstance(san, x509.UniformResourceIdentifier) and san.value == idevid_san_uri:
log.info('Owner ID certificate SAN URI matches IDevID certificate!')
return
exc_msg = 'Owner ID certificate does not match IDevID certificate.'
raise AokiClientOwnerIdCertVerificationError(exc_msg)
def __init__(
self,
server_url: str,
cert_file: str,
key_file: str,
owner_truststore_file: str,
idevid_truststore_file: str,
*args: str, **kwargs: str
) -> None:
"""Initialize the AokiCmpClient."""
[docs]
self.server_url = server_url
[docs]
self.cert_file = cert_file
[docs]
self.key_file = key_file
[docs]
self.owner_truststore_file = owner_truststore_file
[docs]
self.idevid_truststore_file = idevid_truststore_file
[docs]
def onboard(self) -> None:
"""Run the AOKI-CMP Zero-Touch Device Onboarding process."""
# Step 1: Generate a new key for the domain credential
cmd = (
'openssl',
'genrsa',
'-out', f'{CERTS_DIR}/domain_credential_key.pem',
'2048',
)
subprocess.run(cmd, check=True) # noqa: S603
# Step 2: Execute the OpenSSL CMP command to request the domain credential
cmd = (
'openssl',
'cmp',
'-cmd', 'ir',
'-implicit_confirm',
'-server', self.server_url + '/.well-known/cmp/initialization/.aoki/',
'-cert', f'{CERTS_DIR}/{self.cert_file}',
'-key', f'{CERTS_DIR}/{self.key_file}',
'-extracerts', f'{CERTS_DIR}/{self.idevid_truststore_file}',
'-subject', '/CN=Trustpoint Domain Credential',
'-newkey', f'{CERTS_DIR}/domain_credential_key.pem',
'-certout', f'{CERTS_DIR}/dc_cert.pem',
'-chainout', f'{CERTS_DIR}/chain_without_root.pem',
'-extracertsout', f'{CERTS_DIR}/full_chain.pem',
'-trusted', f'{CERTS_DIR}/{self.owner_truststore_file}',
#'-tls_used'
)
try:
print(subprocess.check_output(cmd).decode()) # noqa: S603
except subprocess.CalledProcessError as e:
print('Error occurred while requesting domain credential:', e.output.decode())
return
# Step 3: Validate that the provided Owner ID certificate matches the IDevID certificate
# Assuming first extraCert is the OwnerID / CMP signer cert, this is the case in the Trustpoint implementation
owner_id_cert = self._load_certificates(CERTS_DIR / 'full_chain.pem')[0]
idevid_cert = self._load_certificate(CERTS_DIR / self.cert_file)
self._verify_matches_idevid_cert(owner_id_cert, idevid_cert)
print('AOKI-CMP Client Onboarding completed successfully!')
if __name__ == '__main__':
[docs]
client = AokiCmpClient(
server_url='https://localhost:443', # or 'http://localhost:8000' for dev
cert_file='idevid.pem',
key_file='idevid_pk.pem',
idevid_truststore_file='idevid_ca.pem',
owner_truststore_file='ownerid_ca.pem',
mdns = False, # not yet implemented
)
client.onboard()