workflows.services.engine

Workflow engine: advance instances step-by-step until WAITING or terminal.

Functions

_current_step(inst)

_advance_pointer(inst)

_max_pass_hops(inst)

Upper bound of PASS transitions from current step.

_size_bytes(obj)

_deep_merge_no_overwrite(dst, src)

Deep-merge src into dst; if a leaf exists with a different value, raise ValueError.

_persist_step_context(inst, result)

Persist per-step compacted context if present.

_merge_global_vars(inst, result)

Merge vars into global $vars.

_handle_status(inst, status, signal)

Apply status to the instance and decide whether to continue.

advance_instance(inst[, signal])

Advance an instance until AWAITING or a terminal outcome is reached.

Module Contents

workflows.services.engine._current_step(inst)[source]
Parameters:

inst (workflows.models.WorkflowInstance)

Return type:

dict[str, Any]

workflows.services.engine._advance_pointer(inst)[source]
Parameters:

inst (workflows.models.WorkflowInstance)

Return type:

bool

workflows.services.engine._max_pass_hops(inst)[source]

Upper bound of PASS transitions from current step.

Parameters:

inst (workflows.models.WorkflowInstance)

Return type:

int

workflows.services.engine._size_bytes(obj)[source]
Parameters:

obj (Any)

Return type:

int

workflows.services.engine._deep_merge_no_overwrite(dst, src)[source]

Deep-merge src into dst; if a leaf exists with a different value, raise ValueError.

Parameters:
  • dst (dict[str, Any])

  • src (dict[str, Any])

Return type:

None

workflows.services.engine._persist_step_context(inst, result)[source]

Persist per-step compacted context if present.

Parameters:
Return type:

None

workflows.services.engine._merge_global_vars(inst, result)[source]

Merge vars into global $vars.

Returns:

True if processing can continue. False if the vars size exceeded VARS_MAX_BYTES (instance is marked FAILED).

Parameters:
Return type:

bool

workflows.services.engine._handle_status(inst, status, signal)[source]

Apply status to the instance and decide whether to continue.

Returns:

(should_continue, new_signal)

Parameters:
Return type:

tuple[bool, str | None]

workflows.services.engine.advance_instance(inst, signal=None)[source]

Advance an instance until AWAITING or a terminal outcome is reached.

Parameters:
Return type:

None