Source code for commands.add_domains_and_devices

"""Adds Issuing CAs, Domains and Devices with different onboarding protocols."""

# ruff: noqa: T201  # print is fine in management commands

import random
import secrets
import string

from devices.models import DeviceModel, OnboardingConfigModel, NoOnboardingConfigModel
from django.core.management import call_command
from django.core.management.base import BaseCommand
from pki.models import DevIdRegistration, DomainModel, IssuingCaModel, TruststoreModel
from devices.models import OnboardingPkiProtocol, NoOnboardingPkiProtocol, OnboardingProtocol, OnboardingStatus


ALLOWED_CHARS = allowed_chars = string.ascii_letters + string.digits


[docs] def _get_secret(number_of_symbols: int = 16) -> str: """Generates a secret with the number of symbols provided. Args: number_of_symbols: Number of symbols of the generated secret. Defaults to 16. Returns: The generated secret. """ return ''.join(secrets.choice(allowed_chars) for _ in range(number_of_symbols))
[docs] def get_random_no_onboarding_pki_protocols() -> list[NoOnboardingPkiProtocol]: """Gets random allowed PkiProtocols. Args: include_protocol: This protocol will be included in the allowed list. Returns: A list of PkiProtocols. """ protocols = list(NoOnboardingPkiProtocol) num_choices = secrets.randbelow(len(protocols)) + 1 return random.sample(protocols, k=num_choices)
[docs] def get_random_onboarding_pki_protocols( include_protocol: OnboardingPkiProtocol | None = None) -> list[OnboardingPkiProtocol]: """Gets random allowed PkiProtocols. Args: include_protocol: This protocol will be included in the allowed list. Returns: A list of PkiProtocols. """ protocols = list(OnboardingPkiProtocol) if include_protocol: protocols.remove(include_protocol) num_choices = secrets.randbelow(len(protocols)) + 1 random_protocols = random.sample(protocols, k=num_choices) if include_protocol: random_protocols.append(include_protocol) return random_protocols
[docs] class Command(BaseCommand): """Add domains and associated device names with random onboarding protocol and serial number."""
[docs] help = 'Add domains and associated device names with random onboarding protocol and serial number'
[docs] def handle(self, *_args: tuple[str], **_kwargs: dict[str, str]) -> None: """Execute the command.""" call_command('create_multiple_test_issuing_cas') call_command('import_idevid_truststores') data = { 'arburg': [ 'ALLROUNDER-Injection-Molding-Machine', 'freeformer-3D-Printer', 'SELOGICA-Control-System', 'MULTILIFT-Robotic-Systems', 'ARBIDRIVE-Servo-Motor', 'ALS_Arburg-Leitrechner-System', ], 'homag': [ 'CENTATEQ-CNC-Processing-Center', 'EDGETEQ-Edge-Banding-Machine', 'powerTouch-Control', 'intelliGuide-Assist-System', 'DRILLTEQ-Drilling-and-Dowel-Insertion-Machine', 'STORETEQ-Storage-System', ], 'belden': [ 'Hirschmann-Industrial-Ethernet-Switches', 'Lumberg-Automation-Connectors', 'GarrettCom-Magnum-Routers', 'TROMPETER-Coaxial-Connectors', 'Belden-I_O-Modules', ], 'siemens': [ 'SIMATIC-PLC', 'SINAMICS-Drive-Systems', 'SIRIUS-Control-Devices', 'SIMOTICS-Electric-Motors', 'SIMATIC-HMI-Panels', 'SITOP-Power-Supplies', ], 'phoenix_contact': [ 'CLIPLINE-Terminal-Blocks', 'QUINT-Power-Supplies', 'PLCnext-Control', 'TERMITRAB-Surge-Protection', 'CONTACTRON-Motor-Starters', 'ME-PLC_Modular-Controller', ], 'schmalz': [ 'Vacuum-Generators', 'Vacuum-Grippers', 'Vacuum-Clamping-Systems', 'Suction-Pads', 'Vacuum-Layer-Grippers', 'Vacuum-Ejectors', ], } domain_ca_truststore_map = { 'arburg': ('issuing-ca-a', 'idevid-truststore-RSA-2048'), 'homag': ('issuing-ca-b', 'idevid-truststore-RSA-3072'), 'siemens': ('issuing-ca-c', 'idevid-truststore-RSA-4096'), 'belden': ('issuing-ca-d', 'idevid-truststore-EC-256'), 'phoenix_contact': ('issuing-ca-e', 'idevid-truststore-EC-283'), 'schmalz': ('issuing-ca-f', 'idevid-truststore-EC-570'), } onboarding_protocols = list(OnboardingProtocol) onboarding_protocols.remove(OnboardingProtocol.AOKI) onboarding_protocols.remove(OnboardingProtocol.BRSKI) onboarding_protocols.remove(OnboardingProtocol.MANUAL) onboarding_protocols.remove(OnboardingProtocol.CMP_IDEVID) onboarding_protocols.remove(OnboardingProtocol.EST_IDEVID) print('Starting the process of adding domains and devices...\n') for domain_name, devices in data.items(): issuing_ca_name, truststore_name = domain_ca_truststore_map[domain_name] issuing_ca = IssuingCaModel.objects.get(unique_name=issuing_ca_name) truststore = TruststoreModel.objects.get(unique_name=truststore_name) domain, created = DomainModel.objects.get_or_create(unique_name=domain_name) domain.issuing_ca = issuing_ca domain.save() if created: print(f'Created new domain: {domain_name}') else: print(f'Domain already exists: {domain_name}') devid_reg, devid_created = DevIdRegistration.objects.get_or_create( unique_name=f'devid-reg-{domain_name}', domain=domain, truststore=truststore, serial_number_pattern='^.*$', ) if devid_created: print(f"Created DevIdRegistration for domain '{domain_name}' with truststore '{truststore_name}'") else: print(f"DevIdRegistration already exists for domain '{domain_name}'") print(f'Domain({domain_name}, Issuing CA: {domain.issuing_ca})') device_uses_onboarding = random.choice([True, False]) # noqa: S311 for device_name in devices: random_device_type = random.choice( # noqa: S311 [DeviceModel.DeviceType.GENERIC_DEVICE, DeviceModel.DeviceType.OPC_UA_GDS]) serial_number = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12)) # noqa: S311 if device_uses_onboarding: onboarding_protocol = random.choice(onboarding_protocols) # noqa: S311 print(f"Creating device '{device_name}' in domain '{domain_name}' with:") print(f' - Serial Number: {serial_number}') print(f' - Onboarding Protocol: {onboarding_protocol}') if onboarding_protocol == OnboardingProtocol.MANUAL: onboarding_pki_protocols = get_random_onboarding_pki_protocols() elif onboarding_protocol in [ OnboardingProtocol.EST_IDEVID, OnboardingProtocol.EST_USERNAME_PASSWORD]: onboarding_pki_protocols = get_random_onboarding_pki_protocols(OnboardingPkiProtocol.EST) elif onboarding_protocol in [ OnboardingProtocol.CMP_IDEVID, OnboardingProtocol.CMP_SHARED_SECRET]: onboarding_pki_protocols = get_random_onboarding_pki_protocols(OnboardingPkiProtocol.CMP) else: err_msg = 'Unknown onboarding protocol found.' raise ValueError(err_msg) idevid_trust_store = ( truststore if onboarding_protocol in [OnboardingProtocol.CMP_IDEVID, OnboardingProtocol.EST_IDEVID] else None ) onboarding_config_model = OnboardingConfigModel( onboarding_status=OnboardingStatus.PENDING, onboarding_protocol=onboarding_protocol, idevid_trust_store=idevid_trust_store ) onboarding_config_model.set_pki_protocols(onboarding_pki_protocols) if onboarding_protocol == OnboardingProtocol.CMP_SHARED_SECRET: onboarding_config_model.cmp_shared_secret = _get_secret() if onboarding_protocol == OnboardingProtocol.EST_USERNAME_PASSWORD: onboarding_config_model.est_password = _get_secret() onboarding_config_model.full_clean() device_model = DeviceModel( common_name=device_name, serial_number=serial_number, domain=domain, device_type=random_device_type, onboarding_config=onboarding_config_model ) device_model.full_clean() onboarding_config_model.save() device_model.save() else: serial_number = ''.join(random.choices(string.ascii_uppercase + string.digits, k=12)) # noqa: S311 print(f"Creating device '{device_name}' in domain '{domain_name}' with:") print(f' - Serial Number: {serial_number}') print(' - No Onboarding') no_onboarding_pki_protocols = get_random_no_onboarding_pki_protocols() no_onboarding_config_model = NoOnboardingConfigModel() no_onboarding_config_model.set_pki_protocols(no_onboarding_pki_protocols) if NoOnboardingPkiProtocol.CMP_SHARED_SECRET in no_onboarding_pki_protocols: no_onboarding_config_model.cmp_shared_secret = _get_secret() if NoOnboardingPkiProtocol.EST_USERNAME_PASSWORD in no_onboarding_pki_protocols: no_onboarding_config_model.est_password = _get_secret() no_onboarding_config_model.full_clean() device_model = DeviceModel( common_name=device_name, serial_number=serial_number, domain=domain, device_type=random_device_type ) device_model.no_onboarding_config = no_onboarding_config_model device_model.full_clean() no_onboarding_config_model.save() device_model.save() print(f'Device {device_name} created and saved.') print('\nProcess completed. All domains and devices have been added.')