request.gds_push.gds_push_service

Service for OPC UA GDS Push operations.

Exceptions

GdsPushError

Exception raised for GDS Push operation failures.

Classes

GdsPushService

Service for managing OPC UA GDS Push operations.

Module Contents

exception request.gds_push.gds_push_service.GdsPushError[source]

Bases: Exception

Exception raised for GDS Push operation failures.

class request.gds_push.gds_push_service.GdsPushService(device, *, insecure=False)[source]

Bases: trustpoint.logger.LoggerMixin

Service for managing OPC UA GDS Push operations.

This service handles secure communication with OPC UA servers and implements GDS Push workflows for certificate and trustlist management.

Architecture: - Device provides connection info (IP, port) - Device’s domain provides the CA hierarchy - Domain credential provides client authentication - Truststore (opc_trust_store) provides server validation - TrustList (CA chain + CRLs) gets pushed to server

Parameters:
device: devices.models.DeviceModel[source]
server_url: str[source]
domain_credential: devices.models.IssuedCredentialModel | None[source]
server_truststore: pki.models.truststore.TruststoreModel | None[source]
_validate_device_config()[source]

Validate device has required configuration.

Raises:

GdsPushError – If device configuration is invalid.

Return type:

None

_setup_secure_mode()[source]

Setup credentials and truststore for secure operations.

Raises:

GdsPushError – If secure configuration is incomplete.

Return type:

None

_get_domain_credential()[source]

Get domain credential from device for client authentication.

Returns:

The most recent valid domain credential.

Raises:

GdsPushError – If no valid domain credential found.

Return type:

devices.models.IssuedCredentialModel

_get_server_truststore()[source]

Get truststore containing OPC UA server certificate.

This truststore is used to validate the OPC UA server during connection.

Returns:

Truststore from device’s onboarding config.

Raises:

GdsPushError – If truststore not configured or empty.

Return type:

pki.models.truststore.TruststoreModel

async _build_ca_chain()[source]

Build CA certificate chain from device’s domain issuing CA to root.

Returns:

List of CA models from issuing CA to root CA.

Raises:

GdsPushError – If chain is incomplete or invalid.

Return type:

list[pki.models.CaModel]

async _build_trustlist_for_server()[source]

Build OPC UA TrustList to push to server.

The TrustList tells the OPC UA server which CAs to trust for client certificate validation. It includes: - All CA certificates in the chain (issuing CA to root) - Valid CRLs from all CAs in the chain (mandatory)

Returns:

TrustListDataType ready to push to server.

Raises:

GdsPushError – If trustlist cannot be built, any CA is missing a CRL, or any CRL is expired.

Return type:

asyncua.ua.TrustListDataType

async _get_client_credentials()[source]

Get client certificate and private key from domain credential.

Returns:

Tuple of (certificate crypto object, private key PEM bytes).

Raises:

GdsPushError – If credentials are invalid.

Return type:

tuple[cryptography.x509.Certificate, bytes]

async _build_client_certificate_chain()[source]

Build complete client certificate chain in PEM format.

OPC UA servers need the full chain to validate the client certificate. The chain includes: client cert + issuing CA + intermediate CAs + root CA.

Returns:

Complete certificate chain in PEM format (client cert + CA chain).

Raises:

GdsPushError – If chain cannot be built.

Return type:

bytes

_verify_certificate_key_match(cert, private_key)[source]

Verify that certificate and private key are a matching pair.

Parameters:
  • cert (cryptography.x509.Certificate) – The certificate.

  • private_key (Any) – The private key.

Raises:

GdsPushError – If certificate and key don’t match.

Return type:

None

_validate_client_certificate(cert)[source]

Validate client certificate meets OPC UA requirements.

Parameters:

cert (cryptography.x509.Certificate) – The client certificate to validate.

Raises:

GdsPushError – If certificate doesn’t meet OPC UA requirements.

Return type:

None

_extract_application_uri(cert)[source]

Extract application URI from certificate.

Parameters:

cert (cryptography.x509.Certificate) – The certificate to extract from.

Returns:

The application URI.

Raises:

GdsPushError – If no application URI found.

Return type:

str

async _get_server_certificate()[source]

Get OPC UA server certificate from truststore.

The truststore may contain multiple certificates (server cert + CA chain), but python-opcua’s set_security() expects only the server certificate. The complete chain verification happens during the TLS handshake.

Returns:

Server certificate in DER format (first certificate with order=0).

Raises:

GdsPushError – If truststore is empty or misconfigured.

Return type:

bytes

async _create_secure_client()[source]

Create OPC UA client with secure connection.

Returns:

Configured OPC UA client ready to connect.

Raises:

GdsPushError – If client creation fails.

Return type:

asyncua.Client

_create_insecure_client()[source]

Create OPC UA client without security for discovery.

Returns:

Configured OPC UA client.

Return type:

asyncua.Client

async _log_certificate_mismatch_details(client)[source]

Log essential information about certificate mismatch.

Parameters:

client (asyncua.Client) – The OPC UA client that failed to connect.

Return type:

None

async discover_server()[source]

Discover OPC UA server information without authentication.

Returns:

bool, message: str, server_info: dict | None).

Return type:

Tuple of (success

_analyze_endpoints(endpoints)[source]

Analyze endpoints to extract security policies and endpoint types.

Parameters:

endpoints (list[dict[str, Any]]) – List of endpoint dictionaries.

Returns:

Dictionary with analysis results.

Return type:

dict[str, Any]

async _gather_server_info(client)[source]

Gather server information from connected client.

Parameters:

client (asyncua.Client) – Connected OPC UA client.

Returns:

Dictionary with server information.

Return type:

dict[str, Any]

async update_trustlist()[source]

Update server trustlist with CA chain and CRLs.

Implements OPC UA Part 12 Section 7.7.3 UpdateTrustList workflow.

Returns:

bool, message: str).

Return type:

Tuple of (success

async _discover_trustlist_nodes(client)[source]

Discover TrustList nodes on server.

Parameters:

client (asyncua.Client) – Connected OPC UA client.

Returns:

List of dictionaries with group and trustlist node information.

Return type:

list[dict[str, Any]]

async _update_single_trustlist(trustlist_node, trustlist_data, max_chunk_size=1024)[source]

Update a single TrustList node.

Parameters:
  • trustlist_node (asyncua.ua.Node) – The TrustList node to update.

  • trustlist_data (asyncua.ua.TrustListDataType) – TrustListDataType containing certificates and CRLs.

  • max_chunk_size (int) – Maximum size of each write chunk.

Returns:

True if successful, False otherwise.

Return type:

bool

async update_server_certificate()[source]

Update server certificate using CSR-based workflow.

Implements OPC UA Part 12 Section 7.7.4 UpdateCertificate workflow.

Returns:

bool, message: str, certificate: bytes | None).

Return type:

Tuple of (success

async _discover_certificate_groups(client)[source]

Discover certificate groups on server.

Parameters:

client (asyncua.Client) – Connected OPC UA client.

Returns:

List of dictionaries with group information.

Return type:

list[dict[str, Any]]

async _update_single_certificate(client, certificate_group_id, certificate_type_id=None)[source]

Update certificate for a single certificate group.

Parameters:
  • client (asyncua.Client) – Connected OPC UA client.

  • certificate_group_id (asyncua.ua.NodeId) – NodeId of the certificate group.

  • certificate_type_id (asyncua.ua.NodeId | None) – NodeId of certificate type.

Returns:

bool, certificate: bytes | None, issuer_chain: list[bytes] | None).

Return type:

Tuple of (success

_raise_gds_push_error(msg)[source]

Raise a GdsPushError with the given message.

Parameters:

msg (str)

Return type:

None

async _sign_csr(csr_der)[source]

Sign a Certificate Signing Request using the standardized certificate issuance workflow.

This method uses the same CertificateIssueProcessor workflow as EST to ensure consistent certificate issuance across all protocols.

Parameters:

csr_der (bytes) – DER-encoded CSR from OPC UA server.

Returns:

Tuple of (signed certificate DER, issuer chain as list of DER certs, issued certificate model).

Raises:

GdsPushError – If signing fails.

Return type:

tuple[bytes, list[bytes], pki.models.CertificateModel]

async _update_truststore_with_new_certificate(server_cert_der, issuer_chain)[source]

Update the server truststore with the newly issued certificate and CA chain.

This ensures that the truststore contains the complete certificate chain (server certificate + CA certificates) that the OPC UA server will present during the TLS handshake.

Parameters:
  • server_cert_der (bytes) – DER-encoded server certificate.

  • issuer_chain (list[bytes]) – List of DER-encoded CA certificates.

Raises:

GdsPushError – If truststore update fails.

Return type:

None