Source code for est.tests.client

import base64
import logging

import requests
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization.pkcs7 import load_der_pkcs7_certificates

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


[docs] class ESTClient: def __init__( self, est_url, auth_type, domain=None, cert_template=None, username=None, password=None, cert_path=None, key_path=None, ca_cert_path=None, out_cert_path=None, out_key_path=None, ): """Initialize the EST client with the necessary authentication parameters. :param est_url: Base URL for the EST service :param auth_type: Authentication type ('basic', 'mutual_tls', or 'both') :param domain: Domain for the EST endpoint :param cert_template: Certificate template type :param username: Username for Basic Auth (if applicable) :param password: Password for Basic Auth (if applicable) :param cert_path: Client certificate path for Mutual TLS :param key_path: Client private key path for Mutual TLS :param ca_cert_path: CA certificate path for verifying the EST server :param out_cert_path: Output path for the issued certificate :param out_key_path: Output path for the private key """
[docs] self.est_url = est_url.rstrip('/')
[docs] self.auth_type = auth_type
[docs] self.domain = domain
[docs] self.cert_template = cert_template
[docs] self.username = username
[docs] self.password = password
[docs] self.cert_path = cert_path
[docs] self.key_path = key_path
[docs] self.ca_cert_path = ca_cert_path
[docs] self.out_cert_path = out_cert_path
[docs] self.out_key_path = out_key_path
[docs] self.session = requests.Session()
logging.info('EST Client initialized with authentication type: %s', self.auth_type)
[docs] def _get_auth(self) -> tuple: """Returns authentication parameters based on the chosen method.""" auth = None cert = None if self.auth_type in ['basic', 'both'] and self.username and self.password: auth = (self.username, self.password) logging.info('Using Basic Authentication') if self.auth_type in ['mutual_tls', 'both'] and self.cert_path and self.key_path: cert = (self.cert_path, self.key_path) logging.info('Using Mutual TLS Authentication') return auth, cert
[docs] def enroll(self, common_name, serial_number, save_key=True) -> None: """Performs EST enrollment to obtain a new certificate, with an option to store the private key.""" private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) if save_key: private_key_path = self.out_key_path or 'private_key.pem' with open(private_key_path, 'wb') as key_file: key_file.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) ) logging.info("Private key saved as '%s'", private_key_path) csr_builder = x509.CertificateSigningRequestBuilder() csr_builder = csr_builder.subject_name( x509.Name( [ x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name), x509.NameAttribute(x509.NameOID.SERIAL_NUMBER, serial_number), ] ) ) csr = csr_builder.sign(private_key, hashes.SHA256()) csr_der = csr.public_bytes(encoding=serialization.Encoding.DER) logging.info(f'CSR (DER; hex dump): {csr_der.hex()}') if self.domain and self.cert_template: url = f'{self.est_url}/{self.domain}/{self.cert_template}/simpleenroll/' else: url = f'{self.est_url}/simpleenroll/' headers = {'Content-Type': 'application/pkcs10'} auth, cert = self._get_auth() logging.info('Sending CSR to %s', url) response = self.session.post(url, data=csr_der, headers=headers, auth=auth, cert=cert, verify=self.ca_cert_path) if response.status_code == 200: cert_der = response.content cert_pem = x509.load_der_x509_certificate(cert_der).public_bytes(encoding=serialization.Encoding.PEM) cert_path = self.out_cert_path or 'issued_cert.pem' with open(cert_path, 'wb') as cert_file: cert_file.write(cert_pem) logging.info("Certificate received and saved as '%s'", cert_path) else: logging.error('Enrollment failed: %s', response.text)
[docs] def reenroll(self, cert_path, key_path=None, generate_new_key=False) -> None: """Performs EST reenrollment using an existing certificate. :param cert_path: Path to the existing certificate :param key_path: Path to the existing private key (if using existing key) :param generate_new_key: Boolean to determine if a new key should be generated """ with open(cert_path, 'rb') as cert_file: existing_cert = x509.load_pem_x509_certificate(cert_file.read()) if generate_new_key or not key_path: logging.info('Generating a new private key for reenrollment.') private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) key_path = 'reenrolled_' + (self.out_key_path or 'private_key.pem') with open(key_path, 'wb') as key_file: key_file.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) ) else: logging.info('Using the existing private key for reenrollment.') with open(key_path, 'rb') as key_file: private_key = serialization.load_pem_private_key(key_file.read(), password=None) csr_builder = x509.CertificateSigningRequestBuilder() csr_builder = csr_builder.subject_name(existing_cert.subject) logging.info(f'Reenrollment CSR subject: {existing_cert.subject}') try: san_extension = existing_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) except x509.ExtensionNotFound: pass else: csr_builder = csr_builder.add_extension( san_extension.value, critical=False, ) csr = csr_builder.sign(private_key, hashes.SHA256()) csr_der = csr.public_bytes(encoding=serialization.Encoding.DER) logging.info(f'CSR (DER; hex dump): {csr_der.hex()}') if self.domain and self.cert_template: url = f'{self.est_url}/{self.domain}/{self.cert_template}/simplereenroll/' else: url = f'{self.est_url}/simplereenroll/' headers = {'Content-Type': 'application/pkcs10'} auth, cert = self._get_auth() logging.info('Sending reenrollment CSR to %s', url) response = self.session.post(url, data=csr_der, headers=headers, auth=auth, cert=cert, verify=self.ca_cert_path) if response.status_code == 200: cert_der = response.content cert_pem = x509.load_der_x509_certificate(cert_der).public_bytes(encoding=serialization.Encoding.PEM) cert_path = 'reenrolled_' + (self.out_cert_path or 'cert.pem') with open(cert_path, 'wb') as cert_file: cert_file.write(cert_pem) logging.info("Reenrollment successful. Certificate saved as '%s'", cert_path) else: logging.error('Reenrollment failed: %s', response.text)
[docs] def get_ca_certificates(self) -> None: """Retrieves CA certificates from the EST /cacerts endpoint.""" url = f'{self.est_url}/{self.domain}/cacerts/' logging.info('Fetching CA certificates from %s', url) response = self.session.get(url, verify=self.ca_cert_path) if response.status_code == 200: der_data = ( base64.b64decode(response.content) if response.headers.get('Content-Transfer-Encoding', '').lower() == 'base64' else response.content ) certificates = load_der_pkcs7_certificates(der_data) for i, cert in enumerate(certificates): pem = cert.public_bytes(serialization.Encoding.PEM) logging.info('CA Certificate %d:\n%s', i + 1, pem.decode('utf-8')) with open(f'ca_cert{i}.pem', 'wb') as cert_file: cert_file.write(pem) else: logging.error('Failed to retrieve CA certificates: %s', response.text)
if __name__ == '__main__':
[docs] dc_client = ESTClient( est_url='https://localhost:443/.well-known/est', auth_type='mutual_tls',#'both', domain='arburg', cert_template='domaincredential', username=None,#'admin', password=None,#'testing321', cert_path='idevid.pem', key_path='idevid_pk.pem', ca_cert_path='trust_store.pem', out_cert_path='dc_cert.pem', out_key_path='dc_private_key.pem', )
# enroll Domain Credential dc_client.enroll(common_name='test2.example.com', serial_number='123456788', save_key=True) # dc_client.reenroll( # cert_path='dc_cert.pem', # key_path='dc_private_key.pem', # generate_new_key=False, # ) # client.get_ca_certificates() # use Domain Credential to request an application certificate app_client = ESTClient( est_url='https://localhost:443/.well-known/est', auth_type='mutual_tls',#'both', domain='arburg', cert_template='tlsclient', username=None,#'admin', password=None,#'testing321', cert_path='dc_cert.pem', key_path='dc_private_key.pem', ca_cert_path='trust_store.pem', out_cert_path='app_cert.pem', out_key_path='app_key.pem', ) #app_client.enroll(common_name='test4.example.com', serial_number='4232', save_key=True) app_reenroll_client = ESTClient( est_url='https://localhost:443/.well-known/est', auth_type='mutual_tls',#'both', domain='arburg', cert_template='tlsclient', username=None,#'admin', password=None,#'testing321', cert_path='app_cert.pem', key_path='app_key.pem', ca_cert_path='trust_store.pem', out_cert_path='app_cert.pem', out_key_path='app_key.pem', ) # re-enroll the application certificate #app_reenroll_client.reenroll( # cert_path='app_cert.pem', # key_path='app_key.pem', # generate_new_key=False, #)