Source code for pki.forms
"""Module for managing PKI-related forms in the Trustpoint application."""
from __future__ import annotations
from typing import Any, ClassVar, cast
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from trustpoint_core.serializer import (
CertificateCollectionSerializer,
CertificateSerializer,
CredentialSerializer,
PrivateKeySerializer,
)
from util.field import UniqueNameValidator, get_certificate_name
from pki.models import DevIdRegistration, IssuingCaModel, OwnerCredentialModel
from pki.models.certificate import CertificateModel
from pki.models.truststore import TruststoreModel, TruststoreOrderModel
from trustpoint.logger import LoggerMixin
[docs]
class DevIdAddMethodSelectForm(forms.Form):
"""Form for selecting the method to add an DevID Onboarding Pattern.
Attributes:
method_select (ChoiceField): A dropdown to select the method for adding an Issuing CA.
- `import_truststore`: Import a new truststore prior to configuring a new pattern.
- `configure_pattern`: Use an existing truststore to define a new pattern.
"""
[docs]
method_select = forms.ChoiceField(
label=_('Select Method'),
choices=[
('import_truststore', _('Import a new truststore prior to configuring a new pattern')),
('configure_pattern', _('Use an existing truststore to define a new pattern')),
],
initial='configure_pattern',
required=True,
)
[docs]
class DevIdRegistrationForm(forms.ModelForm[DevIdRegistration]):
"""Form to create a new DevIdRegistration."""
[docs]
class Meta: # noqa: D106
[docs]
fields: ClassVar[list[str]] = ['unique_name', 'truststore', 'domain', 'serial_number_pattern']
[docs]
widgets: ClassVar[dict[str, Any]] = {
'serial_number_pattern': forms.TextInput(
attrs={
'placeholder': 'Enter a regex pattern for serial numbers',
}
),
}
[docs]
labels: ClassVar[dict[str, str]] = {
'unique_name': 'Unique Name',
'truststore': 'Associated Truststore',
'domain': 'Associated Domain',
'serial_number_pattern': 'Serial Number Pattern (Regex)',
}
[docs]
unique_name = forms.CharField(
max_length=256,
label=_('[Optional] Unique Name'),
widget=forms.TextInput(attrs={'autocomplete': 'nope'}),
required=False,
validators=[UniqueNameValidator()],
)
[docs]
def clean(self) -> None:
"""Cleans and validates the form data.
Ensures the unique name is not already used if provided.
Raises:
ValidationError: If the unique name is not unique.
"""
cleaned_data = super().clean()
unique_name = cleaned_data.get('unique_name')
truststore_name = cleaned_data.get('truststore')
if not unique_name and truststore_name:
unique_name = truststore_name.unique_name
cleaned_data['unique_name'] = unique_name
if unique_name and DevIdRegistration.objects.filter(unique_name=unique_name).exists():
error_message = 'DevID Registration with the provided name already exists.'
raise ValidationError(error_message)
self.cleaned_data = cleaned_data
[docs]
class TruststoreAddForm(forms.Form):
"""Form for adding a new truststore.
This form handles the creation of a truststore by validating the unique name,
intended usage, and uploaded file. It ensures the unique name is not already
used and validates the truststore file content before saving.
Attributes:
unique_name (CharField): A unique name for the truststore.
intended_usage (ChoiceField): Specifies the intended usage of the truststore.
trust_store_file (FileField): The PEM or PKCS#7 file to be uploaded.
"""
[docs]
unique_name = forms.CharField(
max_length=256,
label=_('[Optional] Unique Name'),
widget=forms.TextInput(attrs={'autocomplete': 'nope'}),
required=False,
validators=[UniqueNameValidator()],
)
[docs]
intended_usage = forms.ChoiceField(
choices=TruststoreModel.IntendedUsage,
label=_('Intended Usage'),
widget=forms.Select(attrs={'class': 'form-control'}),
required=True,
)
[docs]
def clean(self) -> None:
"""Cleans and validates the form data.
Ensures the uploaded file can be read and validates the unique name
and intended usage fields. If validation passes, initializes and saves
the truststore.
Raises:
ValidationError: If the truststore file cannot be read, the unique name
is not unique, or an unexpected error occurs during initialization.
"""
cleaned_data = cast(dict[str, Any], super().clean())
unique_name = cleaned_data.get('unique_name')
intended_usage = cleaned_data.get('intended_usage')
try:
trust_store_file = cast(bytes, cleaned_data.get('trust_store_file').read())
except (OSError, AttributeError) as original_exception:
error_message = _(
'Unexpected error occurred while trying to get file contents. Please see logs for further details.'
)
raise ValidationError(error_message, code='unexpected-error') from original_exception
try:
certificate_collection_serializer = CertificateCollectionSerializer.from_bytes(trust_store_file)
except Exception as exception:
error_message = 'Unable to process the Truststore. May be malformed / corrupted.'
raise ValidationError(error_message) from exception
try:
certs = certificate_collection_serializer.as_crypto()
if not unique_name:
unique_name = get_certificate_name(certs[0])
if TruststoreModel.objects.filter(unique_name=unique_name).exists():
error_message = 'Truststore with the provided name already exists.'
raise ValidationError(error_message)
trust_store_model = self._save_trust_store(
unique_name=unique_name,
intended_usage=TruststoreModel.IntendedUsage(int(intended_usage)),
certificates=certs,
)
except Exception as exception:
raise ValidationError(str(exception)) from exception
self.cleaned_data['truststore'] = trust_store_model
@staticmethod
[docs]
def _save_trust_store(
unique_name: str, intended_usage: TruststoreModel.IntendedUsage, certificates: list[x509.Certificate]
) -> TruststoreModel:
saved_certs: list[CertificateModel] = []
for certificate in certificates:
sha256_fingerprint = certificate.fingerprint(algorithm=hashes.SHA256()).hex().upper()
try:
saved_certs.append(CertificateModel.objects.get(sha256_fingerprint=sha256_fingerprint))
except CertificateModel.DoesNotExist:
saved_certs.append(CertificateModel.save_certificate(certificate))
trust_store_model = TruststoreModel(unique_name=unique_name, intended_usage=intended_usage)
trust_store_model.save()
for number, certificate_model in enumerate(saved_certs):
trust_store_order_model = TruststoreOrderModel()
trust_store_order_model.order = number
trust_store_order_model.certificate = certificate_model
trust_store_order_model.trust_store = trust_store_model
trust_store_order_model.save()
return trust_store_model
[docs]
class TruststoreDownloadForm(forms.Form):
"""Form for downloading truststores in various formats.
This form provides options to customize the download of truststores, allowing
users to specify the container type, inclusion of certificate chains, and
the file format. It ensures flexibility in exporting truststores for
various use cases and environments.
Attributes:
cert_file_container (ChoiceField): Specifies the container type for the truststore.
- `single_file`: The entire truststore in a single file.
- `zip`: Certificates as separate files in a `.zip` archive.
- `tar_gz`: Certificates as separate files in a `.tar.gz` archive.
cert_chain_incl (ChoiceField): Specifies whether to include certificate chains.
- `cert_only`: Only the selected certificates.
- `chain_incl`: Include certificate chains.
cert_file_format (ChoiceField): Specifies the file format for the truststore.
- `pem`: PEM format (.pem, .crt, .ca-bundle).
- `der`: DER format (.der, .cer).
- `pkcs7_pem`: PKCS#7 format in PEM encoding (.p7b, .p7c, .keystore).
- `pkcs7_der`: PKCS#7 format in DER encoding (.p7b, .p7c, .keystore).
"""
[docs]
cert_file_container = forms.ChoiceField(
label=_('Select Truststore Container Type'),
choices=[
('single_file', _('Single File')),
('zip', _('Separate Certificate Files (as .zip file)')),
('tar_gz', _('Separate Certificate Files (as .tar.gz file)')),
],
initial='single_file',
required=True,
)
[docs]
cert_chain_incl = forms.ChoiceField(
label=_('Select Included Certificates'),
choices=[('cert_only', _('Selected certificates only')), ('chain_incl', _('Include certificate chains'))],
initial='selected_cert_only',
required=True,
)
[docs]
cert_file_format = forms.ChoiceField(
label=_('Select Truststore File Format'),
choices=[
('pem', _('PEM (.pem, .crt, .ca-bundle)')),
('der', _('DER (.der, .cer)')),
('pkcs7_pem', _('PKCS#7 (PEM) (.p7b, .p7c, .keystore)')),
('pkcs7_der', _('PKCS#7 (DER) (.p7b, .p7c, .keystore)')),
],
initial='pem',
required=True,
)
[docs]
class CertificateDownloadForm(forms.Form):
"""Form for downloading certificates in various formats.
This form allows users to customize the download options for certificates,
including the container type, inclusion of certificate chains, and the
file format. It provides flexibility to download certificates in the
desired structure and format for different use cases.
Attributes:
cert_file_container (ChoiceField): Specifies the container type for the downloaded certificates.
- `single_file`: All certificates in a single file.
- `zip`: Certificates as separate files in a `.zip` archive.
- `tar_gz`: Certificates as separate files in a `.tar.gz` archive.
cert_chain_incl (ChoiceField): Specifies whether to include certificate chains.
- `cert_only`: Only the selected certificates.
- `chain_incl`: Include certificate chains.
cert_file_format (ChoiceField): Specifies the file format for the certificates.
- `pem`: PEM format (.pem, .crt, .ca-bundle).
- `der`: DER format (.der, .cer).
- `pkcs7_pem`: PKCS#7 format in PEM encoding (.p7b, .p7c, .keystore).
- `pkcs7_der`: PKCS#7 format in DER encoding (.p7b, .p7c, .keystore).
"""
[docs]
cert_file_container = forms.ChoiceField(
label=_('Select Certificate Container Type'),
choices=[
('single_file', _('Single File')),
('zip', _('Separate Certificate Files (as .zip file)')),
('tar_gz', _('Separate Certificate Files (as .tar.gz file)')),
],
initial='single_file',
required=True,
)
[docs]
cert_chain_incl = forms.ChoiceField(
label=_('Select Included Certificates'),
choices=[('cert_only', _('Selected certificates only')), ('chain_incl', _('Include certificate chains'))],
initial='selected_cert_only',
required=True,
)
[docs]
cert_file_format = forms.ChoiceField(
label=_('Select Certificate File Format'),
choices=[
('pem', _('PEM (.pem, .crt, .ca-bundle)')),
('der', _('DER (.der, .cer)')),
('pkcs7_pem', _('PKCS#7 (PEM) (.p7b, .p7c, .keystore)')),
('pkcs7_der', _('PKCS#7 (DER) (.p7b, .p7c, .keystore)')),
],
initial='pem',
required=True,
)
[docs]
class IssuingCaAddMethodSelectForm(forms.Form):
"""Form for selecting the method to add an Issuing Certificate Authority (CA).
This form provides options to choose the method for adding a new Issuing CA.
Users can select between importing from a file, generating a key pair and
requesting an Issuing CA certificate, or configuring a remote Issuing CA.
Attributes:
method_select (ChoiceField): A dropdown to select the method for adding an Issuing CA.
- `local_file_import`: Import a new Issuing CA from a file.
- `local_request`: Generate a key-pair and request a certificate.
- `remote_est`: Configure a remote Issuing CA.
"""
[docs]
method_select = forms.ChoiceField(
label=_('Select Method'),
choices=[
('local_file_import', _('Import a new Issuing CA from file')),
('local_request', _('Generate a key-pair and request an Issuing CA certificate')),
('remote_est', _('Configure a remote Issuing CA')),
],
initial='local_file_import',
required=True,
)
[docs]
class IssuingCaFileTypeSelectForm(forms.Form):
"""Form for selecting the file type when importing an Issuing CA.
This form allows users to choose the type of file to use for importing an
Issuing Certificate Authority (CA). Supported formats include PKCS#12 and
other common certificate formats such as PEM, PKCS#1, PKCS#7, and PKCS#8.
Attributes:
method_select (ChoiceField): A dropdown to select the file type for the Issuing CA.
"""
# TODO(AlexHx8472): do we need .jks? Java Keystore
[docs]
method_select = forms.ChoiceField(
label=_('File Type'),
choices=[
('pkcs_12', _('PKCS#12')),
('other', _('PEM, PKCS#1, PKCS#7, PKCS#8')),
],
initial='pkcs_12',
required=True,
)
[docs]
class IssuingCaAddFileImportPkcs12Form(LoggerMixin, forms.Form):
"""Form for importing an Issuing CA using a PKCS#12 file.
This form allows the user to upload a PKCS#12 file containing the private key
and certificate chain, along with an optional password. It validates the
uploaded file and its contents and ensures the unique name is not already
used by another Issuing CA.
Attributes:
unique_name (CharField): A unique name for the Issuing CA.
pkcs12_file (FileField): The PKCS#12 file containing the private key and certificates.
pkcs12_password (CharField): An optional password for the PKCS#12 file.
"""
[docs]
unique_name = forms.CharField(
max_length=256,
label=_('[Optional] Unique Name'),
widget=forms.TextInput(attrs={'autocomplete': 'nope'}),
required=False,
validators=[UniqueNameValidator()],
)
[docs]
pkcs12_password = forms.CharField(
# hack, force autocomplete off in chrome with: one-time-code
widget=forms.PasswordInput(attrs={'autocomplete': 'one-time-code'}),
label=_('[Optional] PKCS#12 password'),
required=False,
)
[docs]
def clean(self) -> None:
"""Cleans and validates the entire form.
This method performs additional validation on the cleaned data to ensure
all required fields are valid and consistent. It checks the uploaded PKCS#12
file and its password (if provided) and validates that the unique name
does not conflict with existing entries. Any issues during validation
raise appropriate errors.
Raises:
ValidationError: If the data is invalid, such as when the unique name
is already taken or the PKCS#12 file cannot be read or parsed.
"""
cleaned_data = super().clean()
if not cleaned_data: # only for typing, cleaned_data should always be a dict, but not entirely sure
exc_msg = 'No data was provided.'
raise ValidationError(exc_msg)
unique_name = cleaned_data.get('unique_name')
try:
pkcs12_raw = cleaned_data.get('pkcs12_file').read()
pkcs12_password = cleaned_data.get('pkcs12_password')
except (OSError, AttributeError) as original_exception:
# These exceptions are likely to occur if the file cannot be read or is missing attributes.
error_message = _(
'Unexpected error occurred while trying to get file contents. Please see logs for further details.'
)
raise ValidationError(error_message, code='unexpected-error') from original_exception
if pkcs12_password:
try:
pkcs12_password = pkcs12_password.encode()
except Exception as original_exception:
error_message = 'The PKCS#12 password contains invalid data, that cannot be encoded in UTF-8.'
raise ValidationError(error_message) from original_exception
else:
pkcs12_password = None
try:
credential_serializer = CredentialSerializer.from_pkcs12_bytes(pkcs12_raw, pkcs12_password)
except Exception as exception:
err_msg = _('Failed to parse and load the uploaded file. Either wrong password or corrupted file.')
raise ValidationError(err_msg) from exception
cert_crypto = credential_serializer.certificate
if cert_crypto.extensions.get_extension_for_class(x509.BasicConstraints).value.ca is False:
err_msg = 'The provided certificate is not a CA certificate.'
raise ValidationError(err_msg)
try:
if not unique_name:
unique_name = get_certificate_name(cert_crypto)
if IssuingCaModel.objects.filter(unique_name=unique_name).exists():
error_message = 'Unique name is already taken. Choose another one.'
raise ValidationError(error_message)
IssuingCaModel.create_new_issuing_ca(
unique_name=unique_name,
credential_serializer=credential_serializer,
issuing_ca_type=IssuingCaModel.IssuingCaTypeChoice.LOCAL_UNPROTECTED,
)
# TODO(AlexHx8472): Filter credentials and check if any issuing ca corresponds to it.
# TODO(AlexHx8472): If it does get and display the name of the issuing ca in the message.
# TODO(AlexHx8472): If not, give information about the credential usage that is already in the db.
except ValidationError:
raise
except Exception as exception:
err_msg = str(exception)
raise ValidationError(err_msg) from exception
[docs]
class IssuingCaAddFileImportSeparateFilesForm(LoggerMixin, forms.Form):
"""Form for importing an Issuing CA using separate files.
This form allows the user to upload a private key file, its password (optional),
an Issuing CA certificate file, and an optional certificate chain. The form
validates the uploaded files, ensuring they are correctly formatted, within
size limits, and not already associated with an existing Issuing CA.
Attributes:
unique_name (CharField): A unique name for the Issuing CA.
private_key_file (FileField): The private key file (.key, .pem).
private_key_file_password (CharField): An optional password for the private key.
ca_certificate (FileField): The Issuing CA certificate file (.cer, .der, .pem, .p7b, .p7c).
ca_certificate_chain (FileField): An optional certificate chain file.
"""
[docs]
unique_name = forms.CharField(
max_length=256,
label=_('[Optional] Unique Name'),
widget=forms.TextInput(attrs={'autocomplete': 'nope'}),
required=False,
validators=[UniqueNameValidator()],
)
[docs]
ca_certificate = forms.FileField(label=_('Issuing CA Certificate (.cer, .der, .pem, .p7b, .p7c)'), required=True)
[docs]
ca_certificate_chain = forms.FileField(label=_('[Optional] Certificate Chain (.pem, .p7b, .p7c).'), required=False)
[docs]
private_key_file_password = forms.CharField(
# hack, force autocomplete off in chrome with: one-time-code
widget=forms.PasswordInput(attrs={'autocomplete': 'one-time-code'}),
label=_('[Optional] Private Key File Password'),
required=False,
)
[docs]
def clean_private_key_file(self) -> PrivateKeySerializer:
"""Validates and parses the uploaded private key file.
This method checks if the private key file is provided, ensures it meets
size constraints, and validates its contents. If a password is provided,
it is used to decrypt the private key. Raises validation errors for missing,
oversized, or corrupted private key files.
Returns:
PrivateKeySerializer: A serializer containing the parsed private key.
Raises:
ValidationError: If the private key file is missing, too large, or
corrupted, or if the password is invalid or incompatible.
"""
private_key_file = self.cleaned_data.get('private_key_file')
private_key_file_password = (
self.data.get('private_key_file_password') if self.data.get('private_key_file_password') else None
)
if not private_key_file:
err_msg = 'No private key file was uploaded.'
raise forms.ValidationError(err_msg)
# max size: 64 kiB
max_size = 1024 * 64
if private_key_file.size > max_size:
err_msg = 'Private key file is too large, max. 64 kiB.'
raise ValidationError(err_msg)
try:
return PrivateKeySerializer.from_bytes(private_key_file.read(), private_key_file_password)
except Exception as exception:
err_msg = _('Failed to parse the private key file. Either wrong password or file corrupted.')
raise ValidationError(err_msg) from exception
[docs]
def clean_ca_certificate(self) -> CertificateSerializer:
"""Validates and parses the uploaded Issuing CA certificate file.
This method ensures the provided Issuing CA certificate file is valid and
not already associated with an existing Issuing CA in the database. If the
file is too large, corrupted, or already in use, a validation error is raised.
Returns:
CertificateSerializer: A serializer containing the parsed certificate.
Raises:
ValidationError: If the file is missing, too large, corrupted, or already
associated with an existing Issuing CA.
"""
ca_certificate = self.cleaned_data['ca_certificate']
if not ca_certificate:
err_msg = 'No Issuing CA file was uploaded.'
raise forms.ValidationError(err_msg)
# max size: 64 kiB
max_size = 1024 * 64
if ca_certificate.size > max_size:
err_msg = 'Issuing CA file is too large, max. 64 kiB.'
raise ValidationError(err_msg)
try:
certificate_serializer = CertificateSerializer.from_bytes(ca_certificate.read())
except Exception as exception:
err_msg = _('Failed to parse the Issuing CA certificate. Seems to be corrupted.')
raise ValidationError(err_msg) from exception
cert_crypto = certificate_serializer.as_crypto()
if cert_crypto.extensions.get_extension_for_class(x509.BasicConstraints).value.ca is False:
err_msg = 'The provided certificate is not a CA certificate.'
raise ValidationError(err_msg)
certificate_in_db = CertificateModel.get_cert_by_sha256_fingerprint(
certificate_serializer.as_crypto().fingerprint(algorithm=hashes.SHA256()).hex()
)
if certificate_in_db:
issuing_ca_qs = IssuingCaModel.objects.filter(credential__certificate=certificate_in_db)
if issuing_ca_qs.exists():
issuing_ca_in_db = issuing_ca_qs.first()
err_msg = (
f'Issuing CA {issuing_ca_in_db.unique_name} is already configured '
'with the same Issuing CA certificate.'
)
raise ValidationError(err_msg)
return certificate_serializer
[docs]
def clean_ca_certificate_chain(self) -> None | CertificateCollectionSerializer:
"""Validates and parses the uploaded Issuing CA certificate chain file.
This method checks if the optional certificate chain file is provided.
If present, it validates and attempts to parse the file into a collection
of certificates. Raises a validation error if parsing fails or the file
appears corrupted.
Returns:
CertificateCollectionSerializer: A serializer containing the parsed
certificate chain if provided.
Raises:
ValidationError: If the certificate chain cannot be parsed.
"""
ca_certificate_chain = self.cleaned_data['ca_certificate_chain']
if ca_certificate_chain:
try:
return CertificateCollectionSerializer.from_bytes(ca_certificate_chain.read())
except Exception as exception:
err_msg = _('Failed to parse the Issuing CA certificate chain. Seems to be corrupted.')
raise ValidationError(err_msg) from exception
return None
[docs]
def clean(self) -> None:
"""Cleans and validates the form data.
This method performs additional validation on the provided data,
such as ensuring the unique name, private key file, and certificates
are valid. It also initializes and saves the issuing CA configuration
if all checks pass.
Raises:
ValidationError: If the form data is invalid or there is an error during processing.
"""
try:
cleaned_data = super().clean()
if not cleaned_data:
return
unique_name = cleaned_data.get('unique_name')
private_key_serializer = cleaned_data.get('private_key_file')
ca_certificate_serializer = cleaned_data.get('ca_certificate')
ca_certificate_chain_serializer = (
cleaned_data.get('ca_certificate_chain') if cleaned_data.get('ca_certificate_chain') else None
)
if not private_key_serializer or not ca_certificate_serializer:
return
credential_serializer = CredentialSerializer.from_serializers(
private_key_serializer=private_key_serializer,
certificate_serializer=ca_certificate_serializer,
certificate_collection_serializer=ca_certificate_chain_serializer
)
pk = credential_serializer.private_key
cert = credential_serializer.certificate
if pk.public_key() != cert.public_key():
err_msg = 'The provided private key does not match the Issuing CA certificate.'
raise ValidationError(err_msg)
if not unique_name:
unique_name = get_certificate_name(cert)
if IssuingCaModel.objects.filter(unique_name=unique_name).exists():
error_message = 'Unique name is already taken. Choose another one.'
raise ValidationError(error_message)
IssuingCaModel.create_new_issuing_ca(
unique_name=unique_name,
credential_serializer=credential_serializer,
issuing_ca_type=IssuingCaModel.IssuingCaTypeChoice.LOCAL_UNPROTECTED,
)
# TODO(AlexHx8472): Filter credentials and check if any issuing ca corresponds to it.
# TODO(AlexHx8472): If it does get and display the name of the issuing ca in the message.
# TODO(AlexHx8472): If not, give information about the credential usage that is already in the db.
except ValidationError:
raise
except Exception as exception:
err_msg = str(exception)
raise ValidationError(err_msg) from exception
[docs]
class OwnerCredentialFileImportForm(LoggerMixin, forms.Form):
"""Form for importing an DevOwnerID using separate files.
This form allows the user to upload a private key file, its password (optional),
an DevOwnerID certificate file, and an optional certificate chain. The form
validates the uploaded files, ensuring they are correctly formatted and within
size limits.
Attributes:
unique_name (CharField): A unique name for the Owner Credential.
private_key_file (FileField): The private key file (.key, .pem).
private_key_file_password (CharField): An optional password for the private key.
owner_certificate (FileField): The DevOwnerID certificate file (.cer, .der, .pem, .p7b, .p7c).
owner_certificate_chain (FileField): An optional certificate chain file.
"""
[docs]
unique_name = forms.CharField(
max_length=256,
label=_('[Optional] Unique Name'),
widget=forms.TextInput(attrs={'autocomplete': 'nope'}),
required=False,
validators=[UniqueNameValidator()],
)
[docs]
certificate = forms.FileField(label=_('DevOwnerID Certificate (.cer, .der, .pem, .p7b, .p7c)'), required=True)
[docs]
certificate_chain = forms.FileField(label=_('[Optional] Certificate Chain (.pem, .p7b, .p7c).'), required=False)
[docs]
private_key_file_password = forms.CharField(
# hack, force autocomplete off in chrome with: one-time-code
widget=forms.PasswordInput(attrs={'autocomplete': 'one-time-code'}),
label=_('[Optional] Private Key File Password'),
required=False,
)
[docs]
def clean_private_key_file(self) -> PrivateKeySerializer:
"""Validates and parses the uploaded private key file.
This method checks if the private key file is provided, ensures it meets
size constraints, and validates its contents. If a password is provided,
it is used to decrypt the private key. Raises validation errors for missing,
oversized, or corrupted private key files.
Returns:
PrivateKeySerializer: A serializer containing the parsed private key.
Raises:
ValidationError: If the private key file is missing, too large, or
corrupted, or if the password is invalid or incompatible.
"""
private_key_file = self.cleaned_data.get('private_key_file')
private_key_file_password = (
self.data.get('private_key_file_password') if self.data.get('private_key_file_password') else None
)
if not private_key_file:
err_msg = 'No private key file was uploaded.'
raise forms.ValidationError(err_msg)
# max size: 64 kiB
max_size = 1024 * 64
if private_key_file.size > max_size:
err_msg = 'Private key file is too large, max. 64 kiB.'
raise ValidationError(err_msg)
try:
return PrivateKeySerializer.from_bytes(private_key_file.read(), private_key_file_password)
except Exception as exception:
err_msg = _('Failed to parse the private key file. Either wrong password or file corrupted.')
raise ValidationError(err_msg) from exception
[docs]
def clean_certificate(self) -> CertificateSerializer:
"""Validates and parses the uploaded certificate file.
This method ensures the provided certificate file is valid and
not already associated with an existing DevOwnerID in the database. If the
file is too large, corrupted, or already in use, a validation error is raised.
Returns:
CertificateSerializer: A serializer containing the parsed certificate.
Raises:
ValidationError: If the file is missing, too large, corrupted, or already
associated with an existing Issuing CA.
"""
certificate = self.cleaned_data['certificate']
if not certificate:
err_msg = 'No certificate file was uploaded.'
raise forms.ValidationError(err_msg)
# max size: 64 kiB
max_size = 1024 * 64
if certificate.size > max_size:
err_msg = 'Certificate file is too large, max. 64 kiB.'
raise ValidationError(err_msg)
try:
certificate_serializer = CertificateSerializer.from_bytes(certificate.read())
except Exception as exception:
err_msg = _('Failed to parse the certificate. Seems to be corrupted.')
raise ValidationError(err_msg) from exception
certificate_in_db = CertificateModel.get_cert_by_sha256_fingerprint(
certificate_serializer.as_crypto().fingerprint(algorithm=hashes.SHA256()).hex()
)
if certificate_in_db:
credential_qs = OwnerCredentialModel.objects.filter(credential__certificate=certificate_in_db)
if credential_qs.exists():
credential_in_db = credential_qs.first()
err_msg = (
f'Owner Credential {credential_in_db.unique_name} is already configured '
'with the same DevOwnerID.'
)
raise ValidationError(err_msg)
return certificate_serializer
[docs]
def clean_certificate_chain(self) -> None | CertificateCollectionSerializer:
"""Validates and parses the uploaded certificate chain file.
This method checks if the optional certificate chain file is provided.
If present, it validates and attempts to parse the file into a collection
of certificates. Raises a validation error if parsing fails or the file
appears corrupted.
Returns:
CertificateCollectionSerializer: A serializer containing the parsed
certificate chain if provided.
Raises:
ValidationError: If the certificate chain cannot be parsed.
"""
ca_certificate_chain = self.cleaned_data['certificate_chain']
if ca_certificate_chain:
try:
return CertificateCollectionSerializer.from_bytes(ca_certificate_chain.read())
except Exception as exception:
err_msg = _('Failed to parse the certificate chain. Seems to be corrupted.')
raise ValidationError(err_msg) from exception
return None
[docs]
def clean(self) -> None:
"""Cleans and validates the form data.
This method performs additional validation on the provided data,
such as ensuring the unique name, private key file, and certificates
are valid. It also initializes and saves the OwnerCredential configuration
if all checks pass.
Raises:
ValidationError: If the form data is invalid or there is an error during processing.
"""
try:
cleaned_data = super().clean()
if not cleaned_data:
return
unique_name = cleaned_data.get('unique_name')
private_key_serializer = cleaned_data.get('private_key_file')
certificate_serializer = cleaned_data.get('certificate')
certificate_chain_serializer = (
cleaned_data.get('certificate_chain') if cleaned_data.get('certificate_chain') else None
)
if not private_key_serializer or not certificate_serializer:
return
if not unique_name:
name_from_cert = get_certificate_name(certificate_serializer.as_crypto())
if not name_from_cert:
return
unique_name = name_from_cert
if OwnerCredentialModel.objects.filter(unique_name=unique_name).exists():
error_message = 'Owner Credential with the provided name already exists.'
raise ValidationError(error_message)
cleaned_data['unique_name'] = unique_name
self.cleaned_data = cleaned_data
credential_serializer = CredentialSerializer.from_serializers(
private_key_serializer= private_key_serializer,
certificate_serializer=certificate_serializer,
certificate_collection_serializer=certificate_chain_serializer
)
OwnerCredentialModel.create_new_owner_credential(
unique_name=unique_name,
credential_serializer=credential_serializer,
)
except ValidationError:
raise
except Exception as exception:
err_msg = str(exception)
raise ValidationError(err_msg) from exception