workflows.services.executors.webhook

Webhook step executor.

Attributes

Classes

WebhookExecutor

Execute an outbound HTTP call and optionally export values to $vars.

Functions

_lookup_ctx_path(ctx, path)

_render_ctx_placeholders(template, ctx)

_validate_url(url)

Validate the rendered URL and return (ok, error_message).

_get_step_params(instance)

Return the params dict for the current step, or raise if not found.

_extract_webhook_config(params)

_build_auth(auth_cfg)

_render_url(url_tpl, ctx)

_build_headers(headers_raw, ctx, bearer_token)

_build_body(body_raw, ctx)

_perform_request(method, url, options)

Perform HTTP request with comprehensive SSRF protection.

_build_step_context(method, url, resp, resp_json)

_build_flat_vars(resp, resp_json, result_to_raw, ...)

_make_error_context(message)

_safe_int(v, *, default)

_select_source_value(resp, resp_json, source)

_normalize_headers_for_lookup(headers)

_extract_from_path(resp, resp_json, norm_headers, ...)

_traverse(root, parts)

Module Contents

workflows.services.executors.webhook._CTX_PLACEHOLDER_RE[source]
workflows.services.executors.webhook._lookup_ctx_path(ctx, path)[source]
Parameters:
  • ctx (dict[str, Any])

  • path (str)

Return type:

Any

workflows.services.executors.webhook._render_ctx_placeholders(template, ctx)[source]
Parameters:
  • template (str)

  • ctx (dict[str, Any])

Return type:

str

workflows.services.executors.webhook._validate_url(url)[source]

Validate the rendered URL and return (ok, error_message).

Parameters:

url (str)

Return type:

tuple[bool, str]

workflows.services.executors.webhook.logger[source]
class workflows.services.executors.webhook.WebhookExecutor[source]

Bases: workflows.services.executors.factory.AbstractStepExecutor

Execute an outbound HTTP call and optionally export values to $vars.

  • URL, headers, and string body templating with Django templates using a ‘ctx’ dict

  • Supports method, headers, body, auth (basic|bearer), timeoutSecs

  • result_to/result_source for whole-response capture

  • fine-grained exports: [{“from_path”:”json.foo”,”to_path”:”serial”}] # note: bare key allowed

  • Stores per-step context and returns a flat vars map for $vars merging.

to_path:

Accepts either “serial” or “vars.serial” (we strip optional “vars.” and store under $vars).

do_execute(instance, _signal)[source]

Execute the webhook step.

Parameters:
Returns:

ExecutorResult describing step outcome and exported vars.

Return type:

workflows.services.types.ExecutorResult

workflows.services.executors.webhook._get_step_params(instance)[source]

Return the params dict for the current step, or raise if not found.

Parameters:

instance (workflows.models.WorkflowInstance)

Return type:

dict[str, Any]

workflows.services.executors.webhook._extract_webhook_config(params)[source]
Parameters:

params (dict[str, Any])

Return type:

tuple[str, str, dict[str, Any], Any, int, Any, str, str, list[Any]]

workflows.services.executors.webhook._build_auth(auth_cfg)[source]
Parameters:

auth_cfg (Any)

Return type:

tuple[Any, str | None]

workflows.services.executors.webhook._render_url(url_tpl, ctx)[source]
Parameters:
  • url_tpl (str)

  • ctx (dict[str, Any])

Return type:

tuple[bool, str, str]

workflows.services.executors.webhook._build_headers(headers_raw, ctx, bearer_token)[source]
Parameters:
  • headers_raw (dict[str, Any])

  • ctx (dict[str, Any])

  • bearer_token (str | None)

Return type:

dict[str, str]

workflows.services.executors.webhook._build_body(body_raw, ctx)[source]
Parameters:
  • body_raw (Any)

  • ctx (dict[str, Any])

Return type:

tuple[bool, dict[str, Any], str]

workflows.services.executors.webhook._perform_request(method, url, options)[source]

Perform HTTP request with comprehensive SSRF protection.

The URL has already been validated in _render_url(), but we perform a final validation here as a defense-in-depth measure to ensure no malicious URLs can reach the HTTP request.

Parameters:
  • method (str)

  • url (str)

  • options (dict[str, Any])

Return type:

tuple[requests.Response | None, Any | None, str | None]

workflows.services.executors.webhook._build_step_context(method, url, resp, resp_json)[source]
Parameters:
  • method (str)

  • url (str)

  • resp (requests.Response)

  • resp_json (Any | None)

Return type:

dict[str, Any]

workflows.services.executors.webhook._build_flat_vars(resp, resp_json, result_to_raw, result_source, exports)[source]
Parameters:
  • resp (requests.Response)

  • resp_json (Any | None)

  • result_to_raw (str)

  • result_source (str)

  • exports (list[Any])

Return type:

dict[str, Any]

workflows.services.executors.webhook._make_error_context(message)[source]
Parameters:

message (str)

Return type:

dict[str, Any]

workflows.services.executors.webhook._safe_int(v, *, default)[source]
Parameters:
  • v (Any)

  • default (int)

Return type:

int

workflows.services.executors.webhook._select_source_value(resp, resp_json, source)[source]
Parameters:
  • resp (requests.Response)

  • resp_json (Any | None)

  • source (str)

Return type:

Any

workflows.services.executors.webhook._normalize_headers_for_lookup(headers)[source]
Parameters:

headers (Any)

Return type:

dict[str, Any]

workflows.services.executors.webhook._extract_from_path(resp, resp_json, norm_headers, from_path)[source]
Parameters:
  • resp (requests.Response)

  • resp_json (Any | None)

  • norm_headers (dict[str, Any])

  • from_path (str)

Return type:

Any

workflows.services.executors.webhook._traverse(root, parts)[source]
Parameters:
  • root (Any)

  • parts (list[str])

Return type:

Any